Repository: incubator-ignite Updated Branches: refs/heads/IGNITE-160 [created] cd74aab73
IGNITE-160 Removed executors from IgniteStreamConfiguration Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cd74aab7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cd74aab7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cd74aab7 Branch: refs/heads/IGNITE-160 Commit: cd74aab73d279ad1ffe469dc2d72314cb6a55e93 Parents: e533d15 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Sun Feb 15 17:10:37 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Sun Feb 15 17:10:37 2015 +0300 ---------------------------------------------------------------------- .../processors/streamer/IgniteStreamerImpl.java | 41 ++++------------ .../streamer/VisorStreamerConfiguration.java | 19 -------- .../ignite/streamer/StreamerConfiguration.java | 50 +++++--------------- 3 files changed, 21 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd74aab7/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java index 7ca3f9b..4bfb8c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java @@ -118,9 +118,6 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable { /** Streamer executor service. */ private ExecutorService execSvc; - /** Will be set to true if executor service is created on start. */ - private boolean dfltExecSvc; - /** Failure listeners. */ private Collection<StreamerFailureListener> failureLsnrs = new ConcurrentLinkedQueue<>(); @@ -246,28 +243,14 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable { dfltWin = w; } - execSvc = c.getExecutorService(); + execSvc = new IgniteThreadPoolExecutor( + ctx.gridName(), + c.getThreadPoolSize(), + c.getThreadPoolSize(), + 0, + new LinkedBlockingQueue<Runnable>()); - if (execSvc == null) { - execSvc = new IgniteThreadPoolExecutor( - ctx.gridName(), - Runtime.getRuntime().availableProcessors(), - Runtime.getRuntime().availableProcessors(), - 0, - new LinkedBlockingQueue<Runnable>()); - - execSvcCap = Runtime.getRuntime().availableProcessors(); - dfltExecSvc = true; - } - else { - if (execSvc instanceof ThreadPoolExecutor) { - ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc; - - execSvcCap = exec.getMaximumPoolSize(); - } - else - execSvcCap = -1; - } + execSvcCap = c.getThreadPoolSize(); resetMetrics(); @@ -382,14 +365,8 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable { public void stop(boolean cancel) { ctx.io().removeMessageListener(topic); - if (dfltExecSvc) - // There is no point to wait for tasks execution since it was already handled by flags. - execSvc.shutdownNow(); - else { - // Cannot call shutdownNow here since there may be user tasks in thread pool which cannot be discarded. - if (c.isExecutorServiceShutdown()) - execSvc.shutdown(); - } + // There is no point to wait for tasks execution since it was already handled by flags. + execSvc.shutdownNow(); U.stopLifecycleAware(log, lifecycleAwares()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd74aab7/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java index 7f23c3c..3c32b6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java @@ -48,9 +48,6 @@ public class VisorStreamerConfiguration implements Serializable { /** Maximum number of concurrent events to be processed. */ private int maxConcurrentSessions; - /** Flag indicating whether streamer executor service should be shut down on Ignite stop. */ - private boolean executorSrvcShutdown; - /** * @param scfg Streamer configuration. * @return Data transfer object for streamer configuration properties. @@ -63,7 +60,6 @@ public class VisorStreamerConfiguration implements Serializable { cfg.atLeastOnce(scfg.isAtLeastOnce()); cfg.maximumFailoverAttempts(scfg.getMaximumFailoverAttempts()); cfg.maximumConcurrentSessions(scfg.getMaximumConcurrentSessions()); - cfg.executorServiceShutdown(scfg.isExecutorServiceShutdown()); return cfg; } @@ -156,21 +152,6 @@ public class VisorStreamerConfiguration implements Serializable { this.maxConcurrentSessions = maxConcurrentSessions; } - /** - * @return Flag indicating whether streamer executor service should be shut down on Ignite stop. - */ - public boolean executorServiceShutdown() { - return executorSrvcShutdown; - } - - /** - * @param executorSrvcShutdown New flag indicating whether streamer executor service should be shutdown - * on Ignite stop. - */ - public void executorServiceShutdown(boolean executorSrvcShutdown) { - this.executorSrvcShutdown = executorSrvcShutdown; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(VisorStreamerConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd74aab7/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java index 3cb4724..d0e895f 100644 --- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java @@ -22,7 +22,6 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.util.*; -import java.util.concurrent.*; /** * Streamer configuration. @@ -56,11 +55,8 @@ public class StreamerConfiguration { /** Maximum number of concurrent sessions to be processed. */ private int maxConcurrentSessions = DFLT_MAX_CONCURRENT_SESSIONS; - /** Streamer executor service. */ - private ExecutorService execSvc; - - /** Executor service shutdown flag. */ - private boolean execSvcShutdown; + /** Streamer thread pool size. */ + private int poolSize = Runtime.getRuntime().availableProcessors(); /** * @@ -74,8 +70,7 @@ public class StreamerConfiguration { */ public StreamerConfiguration(StreamerConfiguration c) { atLeastOnce = c.isAtLeastOnce(); - execSvc = c.getExecutorService(); - execSvcShutdown = c.isExecutorServiceShutdown(); + poolSize = c.getThreadPoolSize(); maxConcurrentSessions = c.getMaximumConcurrentSessions(); maxFailoverAttempts = c.getMaximumFailoverAttempts(); name = c.getName(); @@ -227,45 +222,24 @@ public class StreamerConfiguration { } /** - * Gets streamer executor service. Defines a thread pool in which streamer stages will be executed. + * Gets streamer pool size. Defines a thread pool size in which streamer stages will be executed. * <p> * If not specified, thread pool executor with max pool size equal to number of cores will be created. * - * @return Streamer executor service. - */ - public ExecutorService getExecutorService() { - return execSvc; - } - - /** - * Sets streamer executor service. - * - * @param execSvc Executor service to use. - * @see #getExecutorService() - */ - public void setExecutorService(ExecutorService execSvc) { - this.execSvc = execSvc; - } - - /** - * Flag indicating whether streamer executor service should be shut down on Ignite stop. If this flag - * is {@code true}, executor service will be shut down regardless of whether executor was specified externally - * or it was created by Ignite. - * - * @return {@code True} if executor service should be shut down on Ignite stop. + * @return Streamer thread pool size. */ - public boolean isExecutorServiceShutdown() { - return execSvcShutdown; + public int getThreadPoolSize() { + return poolSize; } /** - * Sets flag indicating whether executor service should be shut down on Ignite stop. + * Sets streamer pool size. * - * @param execSvcShutdown {@code True} if executor service should be shut down on Ignite stop. - * @see #isExecutorServiceShutdown() + * @param poolSize Pool size. + * @see #getThreadPoolSize() */ - public void setExecutorServiceShutdown(boolean execSvcShutdown) { - this.execSvcShutdown = execSvcShutdown; + public void setThreadPoolSize(int poolSize) { + this.poolSize = poolSize; } /** {@inheritDoc} */