Repository: incubator-ignite Updated Branches: refs/heads/ignite-418 8c4d6ac30 -> 2b48860be
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java index aa1abd8..3259f53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java @@ -166,7 +166,7 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr IgniteInternalFuture<GridRestResponse> f = hnd.handleAsync(createRestRequest(req, cmd.get1())); - f.listenAsync(new CIX1<IgniteInternalFuture<GridRestResponse>>() { + f.listen(new CIX1<IgniteInternalFuture<GridRestResponse>>() { @Override public void applyx(IgniteInternalFuture<GridRestResponse> f) throws IgniteCheckedException { GridRestResponse restRes = f.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java index 52b3123..2cd50e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java @@ -173,7 +173,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli final GridRestRequest req = createRestRequest(ses, msg); if (req != null) - hnd.handleAsync(req).listenAsync(new CI1<IgniteInternalFuture<GridRestResponse>>() { + hnd.handleAsync(req).listen(new CI1<IgniteInternalFuture<GridRestResponse>>() { @Override public void apply(IgniteInternalFuture<GridRestResponse> fut) { GridClientResponse res = new GridClientResponse(); @@ -213,8 +213,8 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli U.error(log, "Failed to process client request [ses=" + ses + ", msg=" + msg + ']', e); } - } - }); + } + }); else U.error(log, "Failed to process client request (unknown packet type) [ses=" + ses + ", msg=" + msg + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java index 1a11123..54c032e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java @@ -730,7 +730,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable { execSvc.submit(worker); - batchFut.listenAsync(new CI1<IgniteInternalFuture<Object>>() { + batchFut.listen(new CI1<IgniteInternalFuture<Object>>() { @Override public void apply(IgniteInternalFuture<Object> t) { BatchExecutionFuture fut = (BatchExecutionFuture)t; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 6b12554..89bb6f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -7108,7 +7108,7 @@ public abstract class IgniteUtils { */ public static void asyncLogError(IgniteInternalFuture<?> f, final IgniteLogger log) { if (f != null) - f.listenAsync(new CI1<IgniteInternalFuture<?>>() { + f.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { try { f.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 0785aba..53eb13a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -158,7 +158,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { pending.add(fut); futs.add(fut); - fut.listenAsync(new Listener()); + fut.listen(new Listener()); if (isCancelled()) try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java index 7da6423..bbe104e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java @@ -45,13 +45,13 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> { this.embedded = embedded; - embedded.listenAsync(new AL1() { + embedded.listen(new AL1() { @SuppressWarnings({"ErrorNotRethrown", "CatchGenericClass"}) @Override public void applyx(IgniteInternalFuture<B> embedded) { try { onDone(c.apply(embedded.get(), null)); } - catch (IgniteCheckedException| RuntimeException e) { + catch (IgniteCheckedException | RuntimeException e) { onDone(c.apply(null, e)); } catch (Error e) { @@ -77,7 +77,7 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> { this.embedded = embedded; - embedded.listenAsync(new AL1() { + embedded.listen(new AL1() { @Override public void applyx(IgniteInternalFuture<B> embedded) { try { IgniteInternalFuture<A> next = c.apply(embedded.get(), null); @@ -88,7 +88,7 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> { return; } - next.listenAsync(new AL2() { + next.listen(new AL2() { @Override public void applyx(IgniteInternalFuture<A> next) { try { onDone(next.get()); @@ -144,7 +144,7 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> { this.embedded = embedded; - embedded.listenAsync(new AL1() { + embedded.listen(new AL1() { @Override public void applyx(IgniteInternalFuture<B> embedded) { try { IgniteInternalFuture<A> next = c1.apply(embedded.get(), null); @@ -155,7 +155,7 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> { return; } - next.listenAsync(new AL2() { + next.listen(new AL2() { @Override public void applyx(IgniteInternalFuture<A> next) { try { onDone(c2.apply(next.get(), null)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java index 31f6734..691743f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java @@ -118,7 +118,7 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { } /** {@inheritDoc} */ - @Override public void listenAsync(final IgniteInClosure<? super IgniteInternalFuture<T>> lsnr) { + @Override public void listen(final IgniteInClosure<? super IgniteInternalFuture<T>> lsnr) { if (lsnr != null) lsnr.apply(this); } @@ -131,7 +131,7 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { } }; - listenAsync(new GridFutureChainListener<>(fut, doneCb)); + listen(new GridFutureChainListener<>(fut, doneCb)); return fut; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index 30aa855..5246067 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -163,7 +163,7 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements } /** {@inheritDoc} */ - @Override public void listenAsync(@Nullable final IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) { + @Override public void listen(@Nullable final IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) { if (lsnr0 != null) { boolean done = isDone(); @@ -443,7 +443,7 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements this.fut = fut; this.doneCb = doneCb; - fut.listenAsync(new GridFutureChainListener<>(this, doneCb)); + fut.listen(new GridFutureChainListener<>(this, doneCb)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java index d6301b8..05858eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java @@ -73,7 +73,7 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> { /** {@inheritDoc} */ @Override public void listen(@Nullable final IgniteInClosure<? super IgniteFuture<V>> lsnr) { if (lsnr != null) - fut.listenAsync(new InternalFutureListener(lsnr)); + fut.listen(new InternalFutureListener(lsnr)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 5b86fc1..dff4638 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -8503,7 +8503,7 @@ public class GridFunc { }; } - fut.listenAsync(c); + fut.listen(c); } else return fut; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridPlainFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridPlainFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridPlainFuture.java deleted file mode 100644 index e75d74e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridPlainFuture.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.lang; - -import org.apache.ignite.*; - -import java.util.concurrent.*; - -/** - * Future that does not depend on kernal context. - */ -public interface GridPlainFuture<R> { - /** - * Synchronously waits for completion and returns result. - * - * @return Completed future result. - * @throws IgniteCheckedException In case of error. - */ - public R get() throws IgniteCheckedException; - - /** - * Synchronously waits for completion and returns result. - * - * @param timeout Timeout interval to wait future completes. - * @param unit Timeout interval unit to wait future completes. - * @return Completed future result. - * @throws IgniteCheckedException In case of error. - * @throws org.apache.ignite.internal.IgniteFutureTimeoutCheckedException If timed out before future finishes. - */ - public R get(long timeout, TimeUnit unit) throws IgniteCheckedException; - - /** - * Checks if future is done. - * - * @return Whether future is done. - */ - public boolean isDone(); - - /** - * Register new listeners for notification when future completes. - * - * Note that current implementations are calling listeners in - * the completing thread. - * - * @param lsnrs Listeners to be registered. - */ - public void listenAsync(GridPlainInClosure<GridPlainFuture<R>>... lsnrs); - - /** - * Removes listeners registered before. - * - * @param lsnrs Listeners to be removed. - */ - public void stopListenAsync(GridPlainInClosure<GridPlainFuture<R>>... lsnrs); - - /** - * Creates a future that will be completed after this future is completed. The result of - * created future is value returned by {@code cb} closure invoked on this future. - * - * @param cb Callback closure. - * @return Chained future. - */ - public <T> GridPlainFuture<T> chain(GridPlainClosure<GridPlainFuture<R>, T> cb); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridPlainFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridPlainFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridPlainFutureAdapter.java deleted file mode 100644 index 8475a5f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridPlainFutureAdapter.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.lang; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Plain future adapter. - */ -public class GridPlainFutureAdapter<R> implements GridPlainFuture<R> { - /** This future done callbacks. */ - private final ConcurrentLinkedQueue<DoneCallback> cbs = new ConcurrentLinkedQueue<>(); - - /** Done flag. */ - private final AtomicBoolean done = new AtomicBoolean(false); - - /** Latch. */ - private final CountDownLatch doneLatch = new CountDownLatch(1); - - /** Result. */ - private R res; - - /** Error. */ - private Throwable err; - - /** - * Creates not-finished future without any result. - */ - public GridPlainFutureAdapter() { - // No-op. - } - - /** - * Creates succeeded finished future with given result. - * - * @param res Future result. - */ - public GridPlainFutureAdapter(R res) { - onDone(res); - } - - /** - * Creates failed finished future with given error. - * - * @param err Future error. - */ - public GridPlainFutureAdapter(Throwable err) { - onDone(err); - } - - /** {@inheritDoc} */ - @Override public R get() throws IgniteCheckedException { - try { - if (doneLatch.getCount() > 0) - doneLatch.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteCheckedException("Operation was interrupted.", e); - } - - return getResult(); - } - - /** {@inheritDoc} */ - @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException { - A.ensure(timeout >= 0, "timeout >= 0"); - - try { - if (doneLatch.getCount() > 0 && !doneLatch.await(timeout, unit)) - throw new IgniteFutureTimeoutCheckedException("Failed to get future result due to waiting timed out."); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteCheckedException("Operation was interrupted.", e); - } - - return getResult(); - } - - /** - * Get future result. - * - * @return Future result. - * @throws IgniteCheckedException In case of error. - */ - private R getResult() throws IgniteCheckedException { - assert doneLatch.getCount() == 0; - - if (err == null) - return res; - - if (err instanceof Error) - throw (Error)err; - - if (err instanceof IgniteCheckedException) - throw (IgniteCheckedException)err; - - throw new IgniteCheckedException(err); - } - - /** {@inheritDoc} */ - @Override public boolean isDone() { - return done.get(); - } - - /** - * Callback to notify that future is finished successfully. - * - * @param res Result (can be {@code null}). - */ - public void onDone(R res) { - if (done.compareAndSet(false, true)) { - this.res = res; - - doneLatch.countDown(); - - fireDone(); - } - } - - /** - * Callback to notify that future is finished with error. - * - * @param err Error (can't be {@code null}). - */ - public void onDone(Throwable err) { - assert err != null; - - if (done.compareAndSet(false, true)) { - this.err = err; - - doneLatch.countDown(); - - fireDone(); - } - } - - /** - * Register new listeners for notification when future completes. - * - * Note that current implementations are calling listeners in - * the completing thread. - * - * @param lsnrs Listeners to be registered. - */ - @Override public void listenAsync(final GridPlainInClosure<GridPlainFuture<R>>... lsnrs) { - assert lsnrs != null; - - for (GridPlainInClosure<GridPlainFuture<R>> lsnr : lsnrs) - cbs.add(new DoneCallback<R>(null, lsnr, null)); - - if (isDone()) - fireDone(); - } - - /** - * Removes listeners registered before. - * - * @param lsnrs Listeners to be removed. - */ - @Override public void stopListenAsync(GridPlainInClosure<GridPlainFuture<R>>... lsnrs) { - Collection<GridPlainInClosure<GridPlainFuture<R>>> lsnrsCol = lsnrs == null ? null : Arrays.asList(lsnrs); - - for (Iterator<DoneCallback> it = cbs.iterator(); it.hasNext();) { - DoneCallback cb = it.next(); - - if (cb.lsnr == null) - continue; - - // Remove all listeners, if passed listeners collection is 'null'. - if (lsnrsCol == null || lsnrsCol.contains(cb.lsnr)) - it.remove(); - } - } - - /** - * Creates future's chain and completes chained future, when this future finishes. - * - * @param cb Future callback to convert this future result into expected format. - * @param <T> New future format to convert this finished future to. - * @return Chained future with new format. - */ - @Override public <T> GridPlainFutureAdapter<T> chain(GridPlainClosure<GridPlainFuture<R>, T> cb) { - GridPlainFutureAdapter<T> fut = new GridPlainFutureAdapter<>(); - - cbs.add(new DoneCallback<>(cb, null, fut)); - - if (isDone()) - fireDone(); - - return fut; - } - - /** - * Fire event this future has been finished. - */ - @SuppressWarnings("ErrorNotRethrown") - private void fireDone() { - assert isDone(); - - DoneCallback cb; - - Error err = null; - - while ((cb = cbs.poll()) != null) - try { - cb.proceed(); - } - catch (Error e) { - if (err == null) - err = e; - } - - if (err != null) - throw err; - } - - /** This future finished notification callback. */ - private class DoneCallback<T> { - /** Done future callback. */ - private final GridPlainClosure<GridPlainFuture<R>, T> cb; - - /** Done future listener. */ - private final GridPlainInClosure<GridPlainFuture<R>> lsnr; - - /** Chained future. */ - private final GridPlainFutureAdapter<T> chainedFut; - - /** - * Constructs future finished notification callback. - * - * @param cb Future finished callback. - * @param chainedFut Chained future to set callback result to. - */ - private DoneCallback(GridPlainClosure<GridPlainFuture<R>, T> cb, GridPlainInClosure<GridPlainFuture<R>> lsnr, - GridPlainFutureAdapter<T> chainedFut) { - this.cb = cb; - this.lsnr = lsnr; - this.chainedFut = chainedFut; - } - - /** - * Proceed this future callback. - */ - public void proceed() { - GridPlainFutureAdapter<R> fut = GridPlainFutureAdapter.this; - - assert fut.isDone(); - - try { - if (lsnr != null) - lsnr.apply(fut); - - T res = null; - - if (cb != null) - res = cb.apply(fut); - - if (chainedFut != null) - chainedFut.onDone(res); - } - catch (IgniteCheckedException | RuntimeException e) { - if (chainedFut != null) - chainedFut.onDone(e); - } - catch (Error e) { - if (chainedFut != null) - chainedFut.onDone(e); - - throw e; - } - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java index 7311c87..0c50e9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java @@ -50,7 +50,7 @@ public class GridNioEmbeddedFuture<R> extends GridNioFutureImpl<R> { if (err != null) onDone(err); - else delegate.listenAsync(new IgniteInClosure<GridNioFuture<R>>() { + else delegate.listen(new IgniteInClosure<GridNioFuture<R>>() { @Override public void apply(GridNioFuture<R> t) { try { onDone(t.get()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java index fa923f7..f8d73d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java @@ -94,7 +94,7 @@ public class GridNioFinishedFuture<R> implements GridNioFuture<R> { } /** {@inheritDoc} */ - @Override public void listenAsync(@Nullable IgniteInClosure<? super GridNioFuture<R>> lsnr) { + @Override public void listen(@Nullable IgniteInClosure<? super GridNioFuture<R>> lsnr) { if (lsnr != null) lsnr.apply(this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java index 33ddb00..8bc1834 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java @@ -99,7 +99,7 @@ public interface GridNioFuture<R> { * * @param lsnr Listener closure to register. If not provided - this method is no-op. */ - public void listenAsync(@Nullable IgniteInClosure<? super GridNioFuture<R>> lsnr); + public void listen(@Nullable IgniteInClosure<? super GridNioFuture<R>> lsnr); /** * Sets flag indicating that message send future was created in thread that was processing a message. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java index f4c240e..bd30820 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java @@ -161,7 +161,7 @@ public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements } /** {@inheritDoc} */ - @Override public void listenAsync(@Nullable final IgniteInClosure<? super GridNioFuture<R>> lsnr) { + @Override public void listen(@Nullable final IgniteInClosure<? super GridNioFuture<R>> lsnr) { if (lsnr != null) { boolean done = isDone(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index c13eb77..6c94fb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -417,7 +417,7 @@ public class GridNioServer<T> { skipRecoveryPred.apply(msg)); if (lsnr != null) { - fut.listenAsync(lsnr); + fut.listen(lsnr); assert !fut.isDone(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java index 178ded8..3d25f08 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java @@ -134,7 +134,7 @@ public class GridCacheFinishPartitionsSelfTest extends GridCacheAbstractSelfTest IgniteInternalFuture<?> fut = grid.context().cache().context().partitionReleaseFuture(GRID_CNT + 1); - fut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> e) { latch.countDown(); } @@ -235,7 +235,7 @@ public class GridCacheFinishPartitionsSelfTest extends GridCacheAbstractSelfTest assert fut != null; - fut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> e) { end.set(System.currentTimeMillis()); @@ -295,7 +295,7 @@ public class GridCacheFinishPartitionsSelfTest extends GridCacheAbstractSelfTest assert fut != null; - fut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> e) { end.set(System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index 9a99611..080c647 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -23,7 +23,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.igfs.secondary.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -240,8 +240,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @param task Task to execute. * @return Result. */ - protected static <T> GridPlainFuture<T> execute(final Callable<T> task) { - final GridPlainFutureAdapter<T> fut = new GridPlainFutureAdapter<>(); + protected static <T> IgniteInternalFuture<T> execute(final Callable<T> task) { + final GridFutureAdapter<T> fut = new GridFutureAdapter<>(); new Thread(new Runnable() { @Override public void run() { @@ -1525,7 +1525,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { for (int i = 0; i < REPEAT_CNT; i++) { final CyclicBarrier barrier = new CyclicBarrier(2); - GridPlainFuture<Boolean> res1 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res1 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1540,7 +1540,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } }); - GridPlainFuture<Boolean> res2 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res2 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1575,7 +1575,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { create(igfs, paths(DIR, SUBDIR, DIR_NEW), paths()); - GridPlainFuture<Boolean> res1 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res1 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1590,7 +1590,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } }); - GridPlainFuture<Boolean> res2 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res2 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1639,7 +1639,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { create(igfs, paths(DIR, SUBDIR, DIR_NEW), paths()); - GridPlainFuture<Boolean> res1 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res1 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1654,7 +1654,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } }); - GridPlainFuture<Boolean> res2 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res2 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1695,7 +1695,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { create(igfs, paths(DIR, SUBDIR, DIR_NEW), paths()); - GridPlainFuture<Boolean> res1 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res1 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1710,7 +1710,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } }); - GridPlainFuture<Boolean> res2 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res2 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1755,7 +1755,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { create(igfs, paths(DIR, SUBDIR, SUBSUBDIR), paths()); - GridPlainFuture<Boolean> res1 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res1 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1770,7 +1770,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } }); - GridPlainFuture<Boolean> res2 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res2 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java index f44a988..1df79cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; @@ -1391,7 +1391,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfsSecondary, paths(DIR, SUBDIR, DIR_NEW), paths()); - GridPlainFuture<Boolean> res1 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res1 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1406,7 +1406,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { } }); - GridPlainFuture<Boolean> res2 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res2 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1447,7 +1447,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfsSecondary, paths(DIR, SUBDIR, DIR_NEW), paths()); - GridPlainFuture<Boolean> res1 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res1 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1462,7 +1462,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { } }); - GridPlainFuture<Boolean> res2 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res2 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1499,7 +1499,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfsSecondary, paths(DIR, SUBDIR, DIR_NEW), paths()); - GridPlainFuture<Boolean> res1 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res1 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1514,7 +1514,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { } }); - GridPlainFuture<Boolean> res2 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res2 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1559,7 +1559,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfsSecondary, paths(DIR, SUBDIR, SUBSUBDIR), paths()); - GridPlainFuture<Boolean> res1 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res1 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); @@ -1574,7 +1574,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { } }); - GridPlainFuture<Boolean> res2 = execute(new Callable<Boolean>() { + IgniteInternalFuture<Boolean> res2 = execute(new Callable<Boolean>() { @Override public Boolean call() throws Exception { U.awaitQuiet(barrier); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java index 0ed370c..34e55f9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java @@ -124,7 +124,7 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest { final AtomicReference<Exception> err = new AtomicReference<>(); for (int i = 0; i < lsnrCnt; i++) { - fut.listenAsync(new CI1<IgniteInternalFuture<String>>() { + fut.listen(new CI1<IgniteInternalFuture<String>>() { @Override public void apply(IgniteInternalFuture<String> t) { if (Thread.currentThread() != runThread) err.compareAndSet(null, new Exception("Wrong notification thread: " + Thread.currentThread())); @@ -145,7 +145,7 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest { err.set(null); - fut.listenAsync(new CI1<IgniteInternalFuture<String>>() { + fut.listen(new CI1<IgniteInternalFuture<String>>() { @Override public void apply(IgniteInternalFuture<String> t) { if (Thread.currentThread() != runThread) err.compareAndSet(null, new Exception("Wrong notification thread: " + Thread.currentThread())); @@ -163,7 +163,7 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testListenAsyncNotify() throws Exception { + public void testListenNotify() throws Exception { GridTestKernalContext ctx = new GridTestKernalContext(log); ctx.setExecutorService(Executors.newFixedThreadPool(1)); @@ -185,7 +185,7 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest { final AtomicReference<Exception> err = new AtomicReference<>(); for (int i = 0; i < lsnrCnt; i++) { - fut.listenAsync(new CI1<IgniteInternalFuture<String>>() { + fut.listen(new CI1<IgniteInternalFuture<String>>() { @Override public void apply(IgniteInternalFuture<String> t) { if (Thread.currentThread() == runThread) err.compareAndSet(null, new Exception("Wrong notification thread: " + @@ -207,7 +207,7 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest { err.set(null); - fut.listenAsync(new CI1<IgniteInternalFuture<String>>() { + fut.listen(new CI1<IgniteInternalFuture<String>>() { @Override public void apply(IgniteInternalFuture<String> t) { if (Thread.currentThread() == runThread) err.compareAndSet(null, new Exception("Wrong notification thread: " + Thread.currentThread())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureListenPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureListenPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureListenPerformanceTest.java index f87e12d..b550845 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureListenPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureListenPerformanceTest.java @@ -75,18 +75,18 @@ public class GridFutureListenPerformanceTest { futs.add(fut); for (int k = 1; k < rnd.nextInt(3); k++) { - fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> t) { - try { - t.get(); + fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> t) { + try { + t.get(); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + cnt.increment(); } - catch (IgniteCheckedException e) { - e.printStackTrace(); - } - - cnt.increment(); - } - }); + }); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java index e157b15..18c0119 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java @@ -123,7 +123,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { final AtomicReference<Exception> err = new AtomicReference<>(); for (int i = 0; i < lsnrCnt; i++) { - fut.listenAsync(new CI1<GridNioFuture<String>>() { + fut.listen(new CI1<GridNioFuture<String>>() { @Override public void apply(GridNioFuture<String> t) { if (Thread.currentThread() != runThread) err.compareAndSet(null, new Exception("Wrong notification thread: " + Thread.currentThread())); @@ -144,7 +144,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { err.set(null); - fut.listenAsync(new CI1<GridNioFuture<String>>() { + fut.listen(new CI1<GridNioFuture<String>>() { @Override public void apply(GridNioFuture<String> t) { if (Thread.currentThread() != runThread) err.compareAndSet(null, new Exception("Wrong notification thread: " + Thread.currentThread())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/test/java/org/apache/ignite/lang/GridFutureListenPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/lang/GridFutureListenPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/lang/GridFutureListenPerformanceTest.java index a139c4f..12d7f74 100644 --- a/modules/core/src/test/java/org/apache/ignite/lang/GridFutureListenPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/lang/GridFutureListenPerformanceTest.java @@ -75,7 +75,7 @@ public class GridFutureListenPerformanceTest { futs.add(fut); for (int k = 1; k < rnd.nextInt(3); k++) { - fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() { + fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() { @Override public void apply(IgniteInternalFuture<Object> t) { try { t.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index e1c7091..945759f 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -593,11 +593,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { /** * Tests simple message sending-receiving with the use of - * remoteListenAsync() method. + * remoteListen() method. * * @throws Exception If error occurs. */ - public void testRemoteListenAsync() throws Exception { + public void testRemoteListen() throws Exception { final Collection<Object> rcvMsgs = new HashSet<>(); final CountDownLatch rcvLatch = new CountDownLatch(4); @@ -723,11 +723,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { /** * Tests simple message sending-receiving with the use of - * remoteListenAsync() method. + * remoteListen() method. * * @throws Exception If error occurs. */ - public void testRemoteListenAsyncOrderedMessages() throws Exception { + public void testRemoteListenOrderedMessages() throws Exception { List<TestMessage> msgs = Arrays.asList( new TestMessage(MSG_1), new TestMessage(MSG_2, 3000), @@ -777,11 +777,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { /** * Tests simple message sending-receiving with the use of - * remoteListenAsync() method and topics. + * remoteListen() method and topics. * * @throws Exception If error occurs. */ - public void testRemoteListenAsyncWithIntTopic() throws Exception { + public void testRemoteListenWithIntTopic() throws Exception { final Collection<Object> rcvMsgs = new HashSet<>(); final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java index 2200e78..2f19226 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.hadoop.igfs; -import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.*; import org.jetbrains.annotations.*; import java.io.*; @@ -55,7 +55,7 @@ public interface HadoopIgfsEx extends HadoopIgfs { * @param outLen Output length. * @return Read data. */ - public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, + public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, @Nullable final byte[] outBuf, final int outOff, final int outLen); /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java index 59a8f49..4e61c32 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.hadoop.igfs; -import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.future.*; import org.jetbrains.annotations.*; /** * IGFS client future that holds response parse closure. */ -public class HadoopIgfsFuture<T> extends GridPlainFutureAdapter<T> { +public class HadoopIgfsFuture<T> extends GridFutureAdapter<T> { /** Output buffer. */ private byte[] outBuf; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index a8eb58c..44e531e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -20,8 +20,9 @@ package org.apache.ignite.internal.processors.hadoop.igfs; import org.apache.commons.logging.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.future.*; import org.jetbrains.annotations.*; import java.io.*; @@ -296,7 +297,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, + @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, @Nullable byte[] outBuf, int outOff, int outLen) { IgfsInputStreamAdapter stream = delegate.target(); @@ -323,7 +324,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { stream.readFully(pos, res, 0, len); } - return new GridPlainFutureAdapter<>(res); + return new GridFinishedFuture<>(res); } catch (IllegalStateException | IOException e) { HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); @@ -331,7 +332,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { if (lsnr != null) lsnr.onError(e.getMessage()); - return new GridPlainFutureAdapter<>(e); + return new GridFinishedFuture<>(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java index c335a62..7c66a49 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.hadoop.igfs; import org.apache.commons.logging.*; import org.apache.hadoop.fs.*; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -414,7 +414,7 @@ public final class HadoopIgfsInputStream extends InputStream implements Seekable */ private static class FetchBufferPart { /** Read future. */ - private GridPlainFuture<byte[]> readFut; + private IgniteInternalFuture<byte[]> readFut; /** Position of cached chunk in file. */ private long pos; @@ -429,7 +429,7 @@ public final class HadoopIgfsInputStream extends InputStream implements Seekable * @param pos Read position. * @param len Chunk length. */ - private FetchBufferPart(GridPlainFuture<byte[]> readFut, long pos, int len) { + private FetchBufferPart(IgniteInternalFuture<byte[]> readFut, long pos, int len) { this.readFut = readFut; this.pos = pos; this.len = len; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java index 88dd896..75bc27b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.processors.hadoop.igfs; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.lang.*; import org.jetbrains.annotations.*; /** @@ -34,7 +34,7 @@ public interface HadoopIgfsIo { * @return Future that will be completed. * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). */ - public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException; + public IgniteInternalFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException; /** * Sends given IGFS client message and asynchronously awaits for response. When IO detects response @@ -48,7 +48,7 @@ public interface HadoopIgfsIo { * @return Future that will be completed when response is returned from closure. * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). */ - public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) + public <T> IgniteInternalFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java index c9c61fe..b3b981b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java @@ -22,6 +22,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.igfs.common.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.ipc.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.lang.*; @@ -316,12 +317,12 @@ public class HadoopIgfsIpcIo implements HadoopIgfsIo { } /** {@inheritDoc} */ - @Override public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException { + @Override public IgniteInternalFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException { return send(msg, null, 0, 0); } /** {@inheritDoc} */ - @Override public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, + @Override public <T> IgniteInternalFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) throws IgniteCheckedException { assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java index 662541a..a4ddd3c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -20,9 +20,12 @@ package org.apache.ignite.internal.processors.hadoop.igfs; import org.apache.commons.logging.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.igfs.common.*; import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.lang.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -36,40 +39,40 @@ import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.*; */ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener { /** Expected result is boolean. */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure(); + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure(); /** Expected result is boolean. */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Long> LONG_RES = createClosure(); + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Long> LONG_RES = createClosure(); /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure(); + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure(); /** Expected result is {@code IgfsHandshakeResponse} */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsHandshakeResponse> HANDSHAKE_RES = createClosure(); /** Expected result is {@code IgfsStatus} */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsStatus> STATUS_RES = + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsStatus> STATUS_RES = createClosure(); /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure(); /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Collection<IgfsFile>> FILE_COL_RES = createClosure(); /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Collection<IgfsPath>> PATH_COL_RES = createClosure(); /** Expected result is {@code IgfsPathSummary}. */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES = + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES = createClosure(); /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure(); /** Grid name. */ @@ -340,7 +343,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener } /** {@inheritDoc} */ - @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len, + @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len, final @Nullable byte[] outBuf, final int outOff, final int outLen) { assert len > 0; @@ -355,7 +358,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener return io.send(msg, outBuf, outOff, outLen); } catch (IgniteCheckedException e) { - return new GridPlainFutureAdapter<>(e); + return new GridFinishedFuture<>(e); } } @@ -451,15 +454,21 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @return Conversion closure. */ @SuppressWarnings("unchecked") - private static <T> GridPlainClosure<GridPlainFuture<IgfsMessage>, T> createClosure() { - return new GridPlainClosure<GridPlainFuture<IgfsMessage>, T>() { - @Override public T apply(GridPlainFuture<IgfsMessage> fut) throws IgniteCheckedException { - IgfsControlResponse res = (IgfsControlResponse)fut.get(); + private static <T> IgniteClosure<IgniteInternalFuture<IgfsMessage>, T> createClosure() { + return new IgniteClosure<IgniteInternalFuture<IgfsMessage>, T>() { + @Override public T apply(IgniteInternalFuture<IgfsMessage> fut) { + try { + IgfsControlResponse res = (IgfsControlResponse)fut.get(); - if (res.hasError()) - res.throwError(); + if (res.hasError()) + res.throwError(); - return (T)res.response(); + return (T)res.response(); + + } + catch (IgniteCheckedException e) { + throw new GridClosureException(e); + } } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 30133f5..0ca61bc 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -474,7 +474,7 @@ public class HadoopJobTracker extends HadoopComponent { GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache(); cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)). - listenAsync(failsLog); + listen(failsLog); break; } @@ -490,7 +490,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param c Closure of operation. */ private void transform(HadoopJobId jobId, EntryProcessor<HadoopJobId, HadoopJobMetadata, Void> c) { - jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog); + jobMetaCache().invokeAsync(jobId, c).listen(failsLog); } /** @@ -1204,7 +1204,7 @@ public class HadoopJobTracker extends HadoopComponent { }; if (lastMapperFinished) - ctx.shuffle().flush(jobId).listenAsync(cacheUpdater); + ctx.shuffle().flush(jobId).listen(cacheUpdater); else cacheUpdater.apply(null); } @@ -1236,7 +1236,7 @@ public class HadoopJobTracker extends HadoopComponent { // Fail the whole job. transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause())); else { - ctx.shuffle().flush(jobId).listenAsync(new CIX1<IgniteInternalFuture<?>>() { + ctx.shuffle().flush(jobId).listen(new CIX1<IgniteInternalFuture<?>>() { @Override public void applyx(IgniteInternalFuture<?> f) { Throwable err = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java index 1734562..0292e06 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java @@ -61,7 +61,7 @@ public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<Hadoo if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true)) return hadoop.status(jobId); else { - fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { + fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut0) { jobCtx.callcc(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 1f92c66..a19b754 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -383,7 +383,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { fut.onDone(U.unwrap(e)); } - fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { + fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { try { f.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java index 7fe9d19..72ca0eb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java @@ -156,8 +156,9 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { "[jobId=" + meta.jobId() + ", meta=" + meta + ']'); } else { - proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { - @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { + proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + @Override + public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { try { f.get(); @@ -223,7 +224,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { final HadoopProcess proc0 = proc; - proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { @Override public void apply( IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { if (!busyLock.tryReadLock()) @@ -404,7 +405,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { } }, true); - fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + fut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { try { // Make sure there were no exceptions. @@ -789,7 +790,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { terminated = true; if (!initFut.isDone()) - initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { @Override public void apply( IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { proc.destroy(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index 831885f..040552a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -147,7 +147,7 @@ public class HadoopChildProcessRunner { if (!initFut.isDone() && log.isDebugEnabled()) log.debug("Will wait for process initialization future completion: " + req); - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + initFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { try { // Make sure init was successful. @@ -219,7 +219,7 @@ public class HadoopChildProcessRunner { * @param req Update request. */ private void updateTasks(final HadoopJobInfoUpdateRequest req) { - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + initFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> gridFut) { assert initGuard.get(); @@ -316,7 +316,7 @@ public class HadoopChildProcessRunner { final long start = U.currentTimeMillis(); try { - shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() { + shuffleJob.flush().listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { long end = U.currentTimeMillis(); @@ -396,7 +396,7 @@ public class HadoopChildProcessRunner { if (log.isTraceEnabled()) log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']'); - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + initFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { try { HadoopShuffleMessage m = (HadoopShuffleMessage)msg; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04a317be/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java index 499f2fa..38aee5a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java @@ -1243,7 +1243,7 @@ public class HadoopExternalCommunication { log.debug("Accepted connection, initiating handshake: " + ses); // Server initiates handshake. - ses.send(locIdMsg).listenAsync(new CI1<GridNioFuture<?>>() { + ses.send(locIdMsg).listen(new CI1<GridNioFuture<?>>() { @Override public void apply(GridNioFuture<?> fut) { try { // Make sure there were no errors.