futures: api cleanup - removed final and nullable
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7dbe2a39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7dbe2a39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7dbe2a39 Branch: refs/heads/ignite-337 Commit: 7dbe2a3988958d1f9876a3ceece9df785a770170 Parents: af01236 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Mon Mar 9 20:59:03 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Mon Mar 9 20:59:03 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteInternalFuture.java | 10 + .../connection/GridClientNioTcpConnection.java | 5 +- .../GridStreamerStageExecutionFuture.java | 5 - .../processors/streamer/IgniteStreamerImpl.java | 5 - .../ignite/internal/util/IgniteUtils.java | 4 +- .../util/future/GridFinishedFuture.java | 66 +++-- .../internal/util/future/GridFutureAdapter.java | 12 +- .../util/nio/GridNioEmbeddedFuture.java | 9 +- .../util/nio/GridNioFinishedFuture.java | 77 +---- .../ignite/internal/util/nio/GridNioFuture.java | 84 +----- .../internal/util/nio/GridNioFutureImpl.java | 279 +------------------ .../ignite/internal/util/nio/GridNioServer.java | 5 +- .../util/nio/GridTcpNioCommunicationClient.java | 18 +- .../communication/tcp/TcpCommunicationSpi.java | 6 +- .../util/future/nio/GridNioFutureSelfTest.java | 8 +- .../HadoopExternalCommunication.java | 7 +- .../HadoopTcpNioCommunicationClient.java | 12 +- 17 files changed, 95 insertions(+), 517 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java index 88520d7..2b7b821 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java @@ -120,4 +120,14 @@ public interface IgniteInternalFuture<R> { * @return Chained future that finishes after this future completes and done callback is called. */ public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb); + + /** + * @return Error value if future has already been completed with error. + */ + public Throwable error(); + + /** + * @return Result value if future has already been completed normally. + */ + public R result(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java index a87e681..1f70385 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.client.impl.connection; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.client.*; import org.apache.ignite.internal.client.impl.*; import org.apache.ignite.internal.client.marshaller.*; @@ -420,8 +421,8 @@ public class GridClientNioTcpConnection extends GridClientConnection { lastMsgSndTime = U.currentTimeMillis(); if (routeMode) { - sndFut.listen(new CI1<GridNioFuture<?>>() { - @Override public void apply(GridNioFuture<?> sndFut) { + sndFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> sndFut) { try { sndFut.get(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java index af48159..7952c92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java @@ -234,11 +234,6 @@ public class GridStreamerStageExecutionFuture extends GridFutureAdapter<Object> return parentFutId; } - /** {@inheritDoc} */ - @Override public Throwable error() { - return super.error(); - } - /** * @return Map of child executions. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java index 54c032e..98c758b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java @@ -1371,10 +1371,5 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable { this.w = w; } - - /** {@inheritDoc} */ - @Override public Throwable error() { - return super.error(); - } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index e9426c9..4dedd8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -2787,7 +2787,9 @@ public abstract class IgniteUtils { */ public static void onGridStop(){ synchronized (mux) { - assert gridCnt > 0 : gridCnt; + // Grid start may fail and onGridStart() does not get called. + if (gridCnt == 0) + return; --gridCnt; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java index 379f078..242e626 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.future; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -28,11 +29,17 @@ import java.util.concurrent.*; * Future that is completed at creation time. */ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { - /** Complete value. */ - private T t; + /** */ + private static final byte ERR = 1; + + /** */ + private static final byte RES = 2; + + /** */ + private final byte resFlag; - /** Error. */ - private Throwable err; + /** Complete value. */ + private final Object res; /** Start time. */ private final long startTime = U.currentTimeMillis(); @@ -41,7 +48,8 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { * Creates finished future with complete value. */ public GridFinishedFuture() { - // No-op. + res = null; + resFlag = RES; } /** @@ -50,28 +58,27 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { * @param t Finished value. */ public GridFinishedFuture(T t) { - this.t = t; + res = t; + resFlag = RES; } /** * @param err Future error. */ public GridFinishedFuture(Throwable err) { - this.err = err; + res = err; + resFlag = ERR; } - /** - * @return Value of error. - */ - protected Throwable error() { - return err; + /** {@inheritDoc} */ + @Override public Throwable error() { + return (resFlag == ERR) ? (Throwable)res : null; } - /** - * @return Value of result. - */ - protected T result() { - return t; + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public T result() { + return resFlag == RES ? (T)res : null; } /** {@inheritDoc} */ @@ -100,11 +107,12 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public T get() throws IgniteCheckedException { - if (err != null) - throw U.cast(err); + if (resFlag == ERR) + throw U.cast((Throwable)res); - return t; + return (T)res; } /** {@inheritDoc} */ @@ -126,15 +134,15 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { /** {@inheritDoc} */ @Override public <R> IgniteInternalFuture<R> chain(final IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb) { - GridFutureAdapter<R> fut = new GridFutureAdapter<R>() { - @Override public String toString() { - return "ChainFuture[orig=" + GridFinishedFuture.this + ", doneCb=" + doneCb + ']'; - } - }; - - listen(new GridFutureChainListener<>(fut, doneCb)); - - return fut; + try { + return new GridFinishedFuture<>(doneCb.apply(this)); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (RuntimeException | Error e) { + return new GridFinishedFuture<>(e); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index 89ef9fc..efb46e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -82,17 +82,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements return endTime; } - /** - * @return Value of error. - */ - protected Throwable error() { + /** {@inheritDoc} */ + @Override public Throwable error() { return (resFlag == ERR) ? (Throwable)res : null; } - /** - * @return Value of result. - */ - protected R result() { + /** {@inheritDoc} */ + @Override public R result() { return resFlag == RES ? (R)res : null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java index 0c50e9e..32c2adb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java @@ -18,12 +18,11 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; -import java.io.*; - /** * Future that delegates to some other future. */ @@ -50,12 +49,12 @@ public class GridNioEmbeddedFuture<R> extends GridNioFutureImpl<R> { if (err != null) onDone(err); - else delegate.listen(new IgniteInClosure<GridNioFuture<R>>() { - @Override public void apply(GridNioFuture<R> t) { + else delegate.listen(new IgniteInClosure<IgniteInternalFuture<R>>() { + @Override public void apply(IgniteInternalFuture<R> t) { try { onDone(t.get()); } - catch (IOException | IgniteCheckedException e) { + catch (IgniteCheckedException e) { onDone(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java index f8d73d8..9029dd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java @@ -17,86 +17,28 @@ package org.apache.ignite.internal.util.nio; -import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.concurrent.*; /** * Future that represents already completed result. */ -public class GridNioFinishedFuture<R> implements GridNioFuture<R> { - /** Future result. */ - private R res; - - /** Future exception. */ - private Throwable err; - +public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements GridNioFuture<R> { /** Message thread flag. */ private boolean msgThread; /** - * Constructs a future which {@link #get()} method will return a given result. - * - * @param res Future result. + * @param res Result. */ public GridNioFinishedFuture(R res) { - this.res = res; + super(res); } /** - * Constructs a future which {@link #get()} method will throw given exception. - * - * @param err Exception to be thrown. + * @param err Error. */ - public GridNioFinishedFuture(@Nullable Throwable err) { - this.err = err; - } - - /** {@inheritDoc} */ - @Override public R get() throws IOException, IgniteCheckedException { - if (err != null) { - if (err instanceof IOException) - throw (IOException)err; - - throw U.cast(err); - } - - return res; - } - - /** {@inheritDoc} */ - @Override public R get(long timeout) throws IOException, IgniteCheckedException { - return get(); - } - - /** {@inheritDoc} */ - @Override public R get(long timeout, TimeUnit unit) throws IOException, IgniteCheckedException { - return get(); - } - - /** {@inheritDoc} */ - @Override public boolean cancel() throws IgniteCheckedException { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isDone() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean isCancelled() { - return false; - } - - /** {@inheritDoc} */ - @Override public void listen(@Nullable IgniteInClosure<? super GridNioFuture<R>> lsnr) { - if (lsnr != null) - lsnr.apply(this); + public GridNioFinishedFuture(Throwable err) { + super(err); } /** {@inheritDoc} */ @@ -113,4 +55,9 @@ public class GridNioFinishedFuture<R> implements GridNioFuture<R> { @Override public boolean skipRecovery() { return true; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNioFinishedFuture.class, this, super.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java index 8bc1834..7101f45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java @@ -17,90 +17,12 @@ package org.apache.ignite.internal.util.nio; -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.concurrent.*; +import org.apache.ignite.internal.*; /** - * Reduced variant of {@link org.apache.ignite.internal.IgniteInternalFuture} interface. Removed asynchronous - * listen methods which require a valid grid kernal context. - * @param <R> Type of the result for the future. + * NIO future. */ -public interface GridNioFuture<R> { - /** - * Synchronously waits for completion of the operation and - * returns operation result. - * - * @return Operation result. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException Subclass of {@link IgniteCheckedException} thrown if the wait was interrupted. - * @throws org.apache.ignite.internal.IgniteFutureCancelledCheckedException Subclass of {@link IgniteCheckedException} throws if operation was cancelled. - * @throws IgniteCheckedException If operation failed. - * @throws IOException If IOException occurred while performing operation. - */ - public R get() throws IOException, IgniteCheckedException; - - /** - * Synchronously waits for completion of the operation for - * up to the timeout specified and returns operation result. - * This method is equivalent to calling {@link #get(long, TimeUnit) get(long, TimeUnit.MILLISECONDS)}. - * - * @param timeout The maximum time to wait in milliseconds. - * @return Operation result. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException Subclass of {@link IgniteCheckedException} thrown if the wait was interrupted. - * @throws org.apache.ignite.internal.IgniteFutureTimeoutCheckedException Subclass of {@link IgniteCheckedException} thrown if the wait was timed out. - * @throws org.apache.ignite.internal.IgniteFutureCancelledCheckedException Subclass of {@link IgniteCheckedException} throws if operation was cancelled. - * @throws IgniteCheckedException If operation failed. - * @throws IOException If IOException occurred while performing operation. - */ - public R get(long timeout) throws IOException, IgniteCheckedException; - - /** - * Synchronously waits for completion of the operation for - * up to the timeout specified and returns operation result. - * - * @param timeout The maximum time to wait. - * @param unit The time unit of the {@code timeout} argument. - * @return Operation result. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException Subclass of {@link IgniteCheckedException} thrown if the wait was interrupted. - * @throws org.apache.ignite.internal.IgniteFutureTimeoutCheckedException Subclass of {@link IgniteCheckedException} thrown if the wait was timed out. - * @throws org.apache.ignite.internal.IgniteFutureCancelledCheckedException Subclass of {@link IgniteCheckedException} throws if operation was cancelled. - * @throws IgniteCheckedException If operation failed. - * @throws IOException If IOException occurred while performing operation. - */ - public R get(long timeout, TimeUnit unit) throws IOException, IgniteCheckedException; - - /** - * Cancels this future. - * - * @return {@code True} if future was canceled (i.e. was not finished prior to this call). - * @throws IgniteCheckedException If cancellation failed. - */ - public boolean cancel() throws IgniteCheckedException; - - /** - * Checks if operation is done. - * - * @return {@code True} if operation is done, {@code false} otherwise. - */ - public boolean isDone(); - - /** - * Returns {@code true} if this operation was cancelled before it completed normally. - * - * @return {@code True} if this operation was cancelled before it completed normally. - */ - public boolean isCancelled(); - - /** - * Registers listener closure to be asynchronously notified whenever future completes. - * - * @param lsnr Listener closure to register. If not provided - this method is no-op. - */ - public void listen(@Nullable IgniteInClosure<? super GridNioFuture<R>> lsnr); - +public interface GridNioFuture<R> extends IgniteInternalFuture<R> { /** * Sets flag indicating that message send future was created in thread that was processing a message. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java index bd30820..311a66d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java @@ -17,172 +17,16 @@ package org.apache.ignite.internal.util.nio; -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.locks.*; /** * Default future implementation. */ -public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements GridNioFuture<R> { - /** Initial state. */ - private static final int INIT = 0; - - /** Cancelled state. */ - private static final int CANCELLED = 1; - - /** Done state. */ - private static final int DONE = 2; - - /** Result. */ - @GridToStringInclude - private R res; - - /** Error. */ - private Throwable err; - - /** Future start time. */ - protected final long startTime = U.currentTimeMillis(); - - /** */ - protected volatile long endTime; - +public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNioFuture<R> { /** */ protected boolean msgThread; - /** Asynchronous listeners. */ - private Collection<IgniteInClosure<? super GridNioFuture<R>>> lsnrs; - - /** */ - private final Object mux = new Object(); - - /** - * @return Value of error. - */ - protected Throwable error() { - return err; - } - - /** - * @return Value of result. - */ - protected R result() { - return res; - } - - /** {@inheritDoc} */ - @Override public R get() throws IOException, IgniteCheckedException { - try { - if (endTime == 0) - acquireSharedInterruptibly(0); - - if (getState() == CANCELLED) - throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); - - if (err != null) - throw U.cast(err); - - return res; - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public R get(long timeout) throws IOException, IgniteCheckedException { - // Do not replace with static import, as it may not compile. - return get(timeout, TimeUnit.MILLISECONDS); - } - - /** {@inheritDoc} */ - @Override public R get(long timeout, TimeUnit unit) throws IOException, IgniteCheckedException { - A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout); - A.notNull(unit, "unit"); - - try { - return get0(unit.toNanos(timeout)); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e); - } - } - - /** - * @param nanosTimeout Timeout (nanoseconds). - * @return Result. - * @throws InterruptedException If interrupted. - * @throws org.apache.ignite.internal.IgniteFutureTimeoutCheckedException If timeout reached before computation completed. - * @throws IgniteCheckedException If error occurred. - */ - @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException { - if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout)) - throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed."); - - if (getState() == CANCELLED) - throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); - - if (err != null) - throw U.cast(err); - - return res; - } - - /** - * Default no-op implementation that always returns {@code false}. - * Futures that do support cancellation should override this method - * and call {@link #onCancelled()} callback explicitly if cancellation - * indeed did happen. - */ - @Override public boolean cancel() throws IgniteCheckedException { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isDone() { - return getState() != INIT; - } - - /** {@inheritDoc} */ - @Override public boolean isCancelled() { - return getState() == CANCELLED; - } - - /** {@inheritDoc} */ - @Override public void listen(@Nullable final IgniteInClosure<? super GridNioFuture<R>> lsnr) { - if (lsnr != null) { - boolean done = isDone(); - - if (!done) { - synchronized (mux) { - done = isDone(); // Double check. - - if (!done) { - if (lsnrs == null) - lsnrs = new ArrayList<>(); - - lsnrs.add(lsnr); - } - } - } - - if (done) - lsnr.apply(this); - } - } - /** {@inheritDoc} */ @Override public void messageThread(boolean msgThread) { this.msgThread = msgThread; @@ -193,125 +37,6 @@ public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements return msgThread; } - /** - * Notifies all registered listeners. - */ - private void notifyListeners() { - final Collection<IgniteInClosure<? super GridNioFuture<R>>> lsnrs0; - - synchronized (mux) { - lsnrs0 = lsnrs; - - if (lsnrs0 == null) - return; - - lsnrs = null; - } - - assert !lsnrs0.isEmpty(); - - for (IgniteInClosure<? super GridNioFuture<R>> lsnr : lsnrs0) - lsnr.apply(this); - } - - /** - * Callback to notify that future is finished with {@code null} result. - * This method must delegate to {@link #onDone(Object, Throwable)} method. - * - * @return {@code True} if result was set by this call. - */ - public final boolean onDone() { - return onDone(null, null); - } - - /** - * Callback to notify that future is finished. - * This method must delegate to {@link #onDone(Object, Throwable)} method. - * - * @param res Result. - * @return {@code True} if result was set by this call. - */ - public final boolean onDone(@Nullable R res) { - return onDone(res, null); - } - - /** - * Callback to notify that future is finished. - * This method must delegate to {@link #onDone(Object, Throwable)} method. - * - * @param err Error. - * @return {@code True} if result was set by this call. - */ - public final boolean onDone(@Nullable Throwable err) { - return onDone(null, err); - } - - /** - * Callback to notify that future is finished. Note that if non-{@code null} exception is passed in - * the result value will be ignored. - * - * @param res Optional result. - * @param err Optional error. - * @return {@code True} if result was set by this call. - */ - public boolean onDone(@Nullable R res, @Nullable Throwable err) { - return onDone(res, err, false); - } - - /** - * Callback to notify that future is finished. Note that if non-{@code null} exception is passed in - * the result value will be ignored. - * - * @param res Optional result. - * @param err Optional error. - * @param cancel {@code True} if future was cancelled. - * @return {@code True} if result was set by this call. - */ - private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) { - boolean notify = false; - - try { - if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) { - this.res = res; - this.err = err; - - notify = true; - - releaseShared(0); - - return true; - } - - return false; - } - finally { - if (notify) - notifyListeners(); - } - } - - /** - * Callback to notify that future is cancelled. - * - * @return {@code True} if cancel flag was set by this call. - */ - public boolean onCancelled() { - return onDone(null, null, true); - } - - /** {@inheritDoc} */ - @Override protected final int tryAcquireShared(int ignore) { - return endTime != 0 ? 1 : -1; - } - - /** {@inheritDoc} */ - @Override protected final boolean tryReleaseShared(int ignore) { - endTime = U.currentTimeMillis(); - - // Always signal after setting final done status. - return true; - } - /** {@inheritDoc} */ @Override public boolean skipRecovery() { return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 6c94fb1..a884dfb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -408,7 +408,7 @@ public class GridNioServer<T> { */ public GridNioFuture<?> sendSystem(GridNioSession ses, Message msg, - @Nullable IgniteInClosure<? super GridNioFuture<?>> lsnr) { + @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) { assert ses instanceof GridSelectorNioSessionImpl; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; @@ -1766,9 +1766,6 @@ public class GridNioServer<T> { * Class for requesting write and session close operations. */ private static class NioOperationFuture<R> extends GridNioFutureImpl<R> { - /** */ - private static final long serialVersionUID = 0L; - /** Socket channel in register request. */ @GridToStringExclude private SocketChannel sockCh; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index d6006cc..788a8e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -92,14 +92,8 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie GridNioFuture<?> fut = ses.send(data); - if (fut.isDone()) { - try { - fut.get(); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); - } - } + if (fut.isDone()) + fut.get(); } /** {@inheritDoc} */ @@ -114,15 +108,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie try { fut.get(); } - catch (IOException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send message [client=" + this + ", err=" +e + ']'); - - return true; - } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) - log.debug("Failed to send message [client=" + this + ", err=" +e + ']'); + log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); if (e.getCause() instanceof IOException) return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index e8fe1af..5f46f9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -575,8 +575,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void apply(Boolean success) { if (success) { - IgniteInClosure<GridNioFuture<?>> lsnr = new IgniteInClosure<GridNioFuture<?>>() { - @Override public void apply(GridNioFuture<?> msgFut) { + IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> msgFut) { try { msgFut.get(); @@ -585,7 +585,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter fut.onDone(client); } - catch (IgniteCheckedException | IOException e) { + catch (IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to send recovery handshake " + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java index 18c0119..ca9c390 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java @@ -123,8 +123,8 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { final AtomicReference<Exception> err = new AtomicReference<>(); for (int i = 0; i < lsnrCnt; i++) { - fut.listen(new CI1<GridNioFuture<String>>() { - @Override public void apply(GridNioFuture<String> t) { + fut.listen(new CI1<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> t) { if (Thread.currentThread() != runThread) err.compareAndSet(null, new Exception("Wrong notification thread: " + Thread.currentThread())); @@ -144,8 +144,8 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { err.set(null); - fut.listen(new CI1<GridNioFuture<String>>() { - @Override public void apply(GridNioFuture<String> t) { + fut.listen(new CI1<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> t) { if (Thread.currentThread() != runThread) err.compareAndSet(null, new Exception("Wrong notification thread: " + Thread.currentThread())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java index 38aee5a..e81ce9d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.message.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; import org.apache.ignite.internal.util.*; @@ -1243,13 +1244,13 @@ public class HadoopExternalCommunication { log.debug("Accepted connection, initiating handshake: " + ses); // Server initiates handshake. - ses.send(locIdMsg).listen(new CI1<GridNioFuture<?>>() { - @Override public void apply(GridNioFuture<?> fut) { + ses.send(locIdMsg).listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { try { // Make sure there were no errors. fut.get(); } - catch (IgniteCheckedException | IOException e) { + catch (IgniteCheckedException e) { log.warning("Failed to send handshake message, will close session: " + ses, e); ses.close(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7dbe2a39/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java index c4d1c54..9de097b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java @@ -23,8 +23,6 @@ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; import org.apache.ignite.internal.util.nio.*; import org.apache.ignite.internal.util.typedef.internal.*; -import java.io.*; - /** * Grid client for NIO server. */ @@ -73,14 +71,8 @@ public class HadoopTcpNioCommunicationClient extends HadoopAbstractCommunication GridNioFuture<?> fut = ses.send(msg); - if (fut.isDone()) { - try { - fut.get(); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); - } - } + if (fut.isDone()) + fut.get(); } /** {@inheritDoc} */