ignite-11 use ExecutorCompletionService.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8b9e7f8f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8b9e7f8f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8b9e7f8f Branch: refs/heads/ignite-342 Commit: 8b9e7f8f5ae205da389411f5aa763d329e3ea6bd Parents: de75adb Author: sevdokimov <sevdoki...@gridgain.com> Authored: Thu Feb 26 17:23:52 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Thu Feb 26 17:23:52 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/IgniteUtils.java | 1 + .../discovery/tcp/TcpDiscoverySpiAdapter.java | 85 +++++++++----------- 2 files changed, 40 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b9e7f8f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index a8b6991..b24743a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -298,6 +298,7 @@ public abstract class IgniteUtils { private static final Map<Class<? extends IgniteCheckedException>, C1<IgniteCheckedException, IgniteException>> exceptionConverters; + /** */ private volatile static IgniteBiTuple<Collection<String>, Collection<String>> cachedLocalAddr; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b9e7f8f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index 322b954..80b793a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -1018,13 +1018,10 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov private int connInProgress; /** */ - private boolean closed; - - /** */ private final ExecutorService executor; /** */ - private final Queue<GridTuple3<InetSocketAddress, Socket, Exception>> queue = new LinkedList<>(); + private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc; /** * @param addrs Addresses. @@ -1033,19 +1030,19 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) { connInProgress = addrs.size(); - executor = Executors.newFixedThreadPool(Math.min(10, addrs.size())); + executor = Executors.newFixedThreadPool(Math.min(1, addrs.size())); + + completionSrvc = new ExecutorCompletionService<>(executor); for (final InetSocketAddress addr : addrs) { - executor.execute(new Runnable() { - @Override public void run() { + completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() { + @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() { Exception ex = null; Socket sock = null; for (int i = 0; i < retryCnt; i++) { - synchronized (SocketMultiConnector.this) { - if (closed) - return; - } + if (Thread.currentThread().isInterrupted()) + return null; // Executor is shutdown. try { sock = openSocket(addr); @@ -1057,16 +1054,7 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov } } - synchronized (SocketMultiConnector.this) { - if (closed) - U.closeQuiet(sock); - else - queue.add(new GridTuple3<>(addr, sock, ex)); - - connInProgress--; - - SocketMultiConnector.this.notifyAll(); - } + return new GridTuple3<>(addr, sock, ex); } }); } @@ -1075,46 +1063,51 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov /** * */ - @Nullable public synchronized GridTuple3<InetSocketAddress, Socket, Exception> next() { - try { - do { - if (closed) - return null; + @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() { + if (connInProgress == 0) + return null; - GridTuple3<InetSocketAddress, Socket, Exception> res = queue.poll(); - - if (res != null) - return res; - - if (connInProgress == 0) - return null; + try { + connInProgress--; - wait(); - } - while (true); + return completionSrvc.take().get(); } catch (InterruptedException e) { throw new IgniteSpiException("Thread has been interrupted.", e); } + catch (ExecutionException e) { + throw new IgniteSpiException(e); + } } /** * */ public void close() { - synchronized (this) { - if (closed) - return; - - closed = true; + executor.shutdown(); - notifyAll(); - } + if (connInProgress > 0) { + new Thread(new Runnable() { + @Override public void run() { + try { + for (int i = 0; i < connInProgress; i++) { + try { + GridTuple3<InetSocketAddress, Socket, Exception> take = completionSrvc.take().get(); - executor.shutdown(); + if (take != null) + IgniteUtils.closeQuiet(take.get2()); + } + catch (ExecutionException ignored) { - for (GridTuple3<InetSocketAddress, Socket, Exception> tuple : queue) - U.closeQuiet(tuple.get2()); + } + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }).start(); + } } } }