Repository: incubator-ignite
Updated Branches:
  refs/heads/sprint-1 9671e2187 -> e608da8fe


IGNITE-160 Removed executors from IgniteStreamConfiguration


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cd74aab7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cd74aab7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cd74aab7

Branch: refs/heads/sprint-1
Commit: cd74aab73d279ad1ffe469dc2d72314cb6a55e93
Parents: e533d15
Author: nikolay_tikhonov <ntikho...@gridgain.com>
Authored: Sun Feb 15 17:10:37 2015 +0300
Committer: nikolay_tikhonov <ntikho...@gridgain.com>
Committed: Sun Feb 15 17:10:37 2015 +0300

----------------------------------------------------------------------
 .../processors/streamer/IgniteStreamerImpl.java | 41 ++++------------
 .../streamer/VisorStreamerConfiguration.java    | 19 --------
 .../ignite/streamer/StreamerConfiguration.java  | 50 +++++---------------
 3 files changed, 21 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd74aab7/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
index 7ca3f9b..4bfb8c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
@@ -118,9 +118,6 @@ public class IgniteStreamerImpl implements 
IgniteStreamerEx, Externalizable {
     /** Streamer executor service. */
     private ExecutorService execSvc;
 
-    /** Will be set to true if executor service is created on start. */
-    private boolean dfltExecSvc;
-
     /** Failure listeners. */
     private Collection<StreamerFailureListener> failureLsnrs = new 
ConcurrentLinkedQueue<>();
 
@@ -246,28 +243,14 @@ public class IgniteStreamerImpl implements 
IgniteStreamerEx, Externalizable {
                 dfltWin = w;
         }
 
-        execSvc = c.getExecutorService();
+        execSvc = new IgniteThreadPoolExecutor(
+            ctx.gridName(),
+            c.getThreadPoolSize(),
+            c.getThreadPoolSize(),
+            0,
+            new LinkedBlockingQueue<Runnable>());
 
-        if (execSvc == null) {
-            execSvc = new IgniteThreadPoolExecutor(
-                ctx.gridName(),
-                Runtime.getRuntime().availableProcessors(),
-                Runtime.getRuntime().availableProcessors(),
-                0,
-                new LinkedBlockingQueue<Runnable>());
-
-            execSvcCap = Runtime.getRuntime().availableProcessors();
-            dfltExecSvc = true;
-        }
-        else {
-            if (execSvc instanceof ThreadPoolExecutor) {
-                ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
-
-                execSvcCap = exec.getMaximumPoolSize();
-            }
-            else
-                execSvcCap = -1;
-        }
+        execSvcCap = c.getThreadPoolSize();
 
         resetMetrics();
 
@@ -382,14 +365,8 @@ public class IgniteStreamerImpl implements 
IgniteStreamerEx, Externalizable {
     public void stop(boolean cancel) {
         ctx.io().removeMessageListener(topic);
 
-        if (dfltExecSvc)
-            // There is no point to wait for tasks execution since it was 
already handled by flags.
-            execSvc.shutdownNow();
-        else {
-            // Cannot call shutdownNow here since there may be user tasks in 
thread pool which cannot be discarded.
-            if (c.isExecutorServiceShutdown())
-                execSvc.shutdown();
-        }
+        // There is no point to wait for tasks execution since it was already 
handled by flags.
+        execSvc.shutdownNow();
 
         U.stopLifecycleAware(log, lifecycleAwares());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd74aab7/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java
index 7f23c3c..3c32b6b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java
@@ -48,9 +48,6 @@ public class VisorStreamerConfiguration implements 
Serializable {
     /** Maximum number of concurrent events to be processed. */
     private int maxConcurrentSessions;
 
-    /** Flag indicating whether streamer executor service should be shut down 
on Ignite stop. */
-    private boolean executorSrvcShutdown;
-
     /**
      * @param scfg Streamer configuration.
      * @return Data transfer object for streamer configuration properties.
@@ -63,7 +60,6 @@ public class VisorStreamerConfiguration implements 
Serializable {
         cfg.atLeastOnce(scfg.isAtLeastOnce());
         cfg.maximumFailoverAttempts(scfg.getMaximumFailoverAttempts());
         cfg.maximumConcurrentSessions(scfg.getMaximumConcurrentSessions());
-        cfg.executorServiceShutdown(scfg.isExecutorServiceShutdown());
 
         return cfg;
     }
@@ -156,21 +152,6 @@ public class VisorStreamerConfiguration implements 
Serializable {
         this.maxConcurrentSessions = maxConcurrentSessions;
     }
 
-    /**
-     * @return Flag indicating whether streamer executor service should be 
shut down on Ignite stop.
-     */
-    public boolean executorServiceShutdown() {
-        return executorSrvcShutdown;
-    }
-
-    /**
-     * @param executorSrvcShutdown New flag indicating whether streamer 
executor service should be shutdown
-     *      on Ignite stop.
-     */
-    public void executorServiceShutdown(boolean executorSrvcShutdown) {
-        this.executorSrvcShutdown = executorSrvcShutdown;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(VisorStreamerConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd74aab7/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java
index 3cb4724..d0e895f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java
@@ -22,7 +22,6 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 
 /**
  * Streamer configuration.
@@ -56,11 +55,8 @@ public class StreamerConfiguration {
     /** Maximum number of concurrent sessions to be processed. */
     private int maxConcurrentSessions = DFLT_MAX_CONCURRENT_SESSIONS;
 
-    /** Streamer executor service. */
-    private ExecutorService execSvc;
-
-    /** Executor service shutdown flag. */
-    private boolean execSvcShutdown;
+    /** Streamer thread pool size. */
+    private int poolSize = Runtime.getRuntime().availableProcessors();
 
     /**
      *
@@ -74,8 +70,7 @@ public class StreamerConfiguration {
      */
     public StreamerConfiguration(StreamerConfiguration c) {
         atLeastOnce = c.isAtLeastOnce();
-        execSvc = c.getExecutorService();
-        execSvcShutdown = c.isExecutorServiceShutdown();
+        poolSize = c.getThreadPoolSize();
         maxConcurrentSessions = c.getMaximumConcurrentSessions();
         maxFailoverAttempts = c.getMaximumFailoverAttempts();
         name = c.getName();
@@ -227,45 +222,24 @@ public class StreamerConfiguration {
     }
 
     /**
-     * Gets streamer executor service. Defines a thread pool in which streamer 
stages will be executed.
+     * Gets streamer pool size. Defines a thread pool size in which streamer 
stages will be executed.
      * <p>
      * If not specified, thread pool executor with max pool size equal to 
number of cores will be created.
      *
-     * @return Streamer executor service.
-     */
-    public ExecutorService getExecutorService() {
-        return execSvc;
-    }
-
-    /**
-     * Sets streamer executor service.
-     *
-     * @param execSvc Executor service to use.
-     * @see #getExecutorService()
-     */
-    public void setExecutorService(ExecutorService execSvc) {
-        this.execSvc = execSvc;
-    }
-
-    /**
-     * Flag indicating whether streamer executor service should be shut down 
on Ignite stop. If this flag
-     * is {@code true}, executor service will be shut down regardless of 
whether executor was specified externally
-     * or it was created by Ignite.
-     *
-     * @return {@code True} if executor service should be shut down on Ignite 
stop.
+     * @return Streamer thread pool size.
      */
-    public boolean isExecutorServiceShutdown() {
-        return execSvcShutdown;
+    public int getThreadPoolSize() {
+        return poolSize;
     }
 
     /**
-     * Sets flag indicating whether executor service should be shut down on 
Ignite stop.
+     * Sets streamer pool size.
      *
-     * @param execSvcShutdown {@code True} if executor service should be shut 
down on Ignite stop.
-     * @see #isExecutorServiceShutdown()
+     * @param poolSize Pool size.
+     * @see #getThreadPoolSize()
      */
-    public void setExecutorServiceShutdown(boolean execSvcShutdown) {
-        this.execSvcShutdown = execSvcShutdown;
+    public void setThreadPoolSize(int poolSize) {
+        this.poolSize = poolSize;
     }
 
     /** {@inheritDoc} */

Reply via email to