Repository: incubator-ignite Updated Branches: refs/heads/ignite-1058 6d2651082 -> 29fe50c78
#[GG-10298]: workable solution. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/999b5f03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/999b5f03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/999b5f03 Branch: refs/heads/ignite-1058 Commit: 999b5f03a0fabb959b3600e3554db01f92d4d79e Parents: e1c49b7 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Thu Jun 25 20:06:06 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Thu Jun 25 20:06:06 2015 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 32 ++++++++++++++++++++ .../ignite/internal/GridKernalContext.java | 5 +++ .../ignite/internal/GridKernalContextImpl.java | 11 +++++++ .../apache/ignite/internal/IgniteKernal.java | 4 ++- .../org/apache/ignite/internal/IgnitionEx.java | 12 +++++++- .../managers/communication/GridIoManager.java | 15 ++++++++- .../managers/communication/GridIoPolicy.java | 5 ++- .../junits/GridTestKernalContext.java | 10 ++++++ 8 files changed, 90 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 2d36c7a..73d0c80 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -156,12 +156,18 @@ public class IgniteConfiguration { /** Default max queue capacity of system thread pool. */ public static final int DFLT_SYSTEM_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE; + /** Default max queue capacity of DR (Data Replication) thread pool. */ + public static final int DFLT_DR_THREADPOOL_QUEUE_CAP = 40; + /** Default size of peer class loading thread pool. */ public static final int DFLT_P2P_THREAD_CNT = 2; /** Default size of management thread pool. */ public static final int DFLT_MGMT_THREAD_CNT = 4; + /** Default size of DR (Data Replication) thread pool. */ + public static final int DFLT_DR_THREAD_CNT = 4; + /** Default segmentation policy. */ public static final SegmentationPolicy DFLT_SEG_PLC = STOP; @@ -210,6 +216,9 @@ public class IgniteConfiguration { /** IGFS pool size. */ private int igfsPoolSize = AVAILABLE_PROC_CNT; + /** DR pool size. */ + private int drPoolSize = DFLT_DR_THREAD_CNT; + /** Utility cache pool size. */ private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT; @@ -682,6 +691,17 @@ public class IgniteConfiguration { } /** + * Size of thread pool that is in charge of processing DR (Data Replication) messages. + * <p> + * If not provided, executor service will have size {@link #DFLT_DR_THREAD_CNT}. + * + * @return Thread pool size to be used for DR message processing. + */ + public int getDrThreadPoolSize() { + return drPoolSize; + } + + /** * Default size of thread pool that is in charge of processing utility cache messages. * <p> * If not provided, executor service will have size {@link #DFLT_SYSTEM_CORE_THREAD_CNT}. @@ -791,6 +811,18 @@ public class IgniteConfiguration { } /** + * Set thread pool size that will be used to send and receive DR (Data Replication) messages. + * + * @param poolSize Set thread pool size. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setDrThreadPoolSize(int poolSize) { + drPoolSize = poolSize; + + return this; + } + + /** * Sets default thread pool size that will be used to process utility cache messages. * * @param poolSize Default executor service size to use for utility cache messages. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index d6542f3..c354609 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -465,6 +465,11 @@ public interface GridKernalContext extends Iterable<GridComponent> { public ExecutorService getExecutorService(); /** + * @return Thread pool implementation to be used in grid data replication. + */ + public ExecutorService getDrExecutorService(); + + /** * Executor service that is in charge of processing internal system messages. * * @return Thread pool implementation to be used in grid for internal system messages. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 65107a7..ecc02a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -254,6 +254,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + protected ExecutorService drExecSvc; + + /** */ + @GridToStringExclude protected ExecutorService sysExecSvc; /** */ @@ -341,6 +345,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + ExecutorService drExecSvc, List<PluginProvider> plugins) throws IgniteCheckedException { assert grid != null; assert cfg != null; @@ -357,6 +362,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.mgmtExecSvc = mgmtExecSvc; this.igfsExecSvc = igfsExecSvc; this.restExecSvc = restExecSvc; + this.drExecSvc = drExecSvc; marshCtx = new MarshallerContextImpl(plugins); @@ -838,6 +844,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public ExecutorService getDrExecutorService() { + return drExecSvc; + } + + /** {@inheritDoc} */ @Override public ExecutorService getSystemExecutorService() { return sysExecSvc; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index e19d3d3..2c6596b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -77,7 +77,6 @@ import javax.management.*; import java.io.*; import java.lang.management.*; import java.lang.reflect.*; -import java.security.*; import java.text.*; import java.util.*; import java.util.concurrent.*; @@ -528,6 +527,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. * @param restExecSvc Reset executor service. + * @param drExecSvc DR (Data Replication) executor service. * @param errHnd Error handler to use for notification about startup problems. * @throws IgniteCheckedException Thrown in case of any errors. */ @@ -541,6 +541,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + ExecutorService drExecSvc, GridAbsClosure errHnd) throws IgniteCheckedException { @@ -646,6 +647,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { mgmtExecSvc, igfsExecSvc, restExecSvc, + drExecSvc, plugins); cfg.getMarshaller().setContext(ctx.marshallerContext()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 5cbe377..b502f14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1296,6 +1296,9 @@ public class IgnitionEx { /** Marshaller cache executor service. */ private ExecutorService marshCacheExecSvc; + /** DR cache executor service. */ + private ExecutorService drExecSvc; + /** Grid state. */ private volatile IgniteState state = STOPPED; @@ -1524,6 +1527,13 @@ public class IgnitionEx { myCfg.getMarshallerCacheKeepAliveTime(), new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); + drExecSvc = new IgniteThreadPoolExecutor( + "data-replication-" + cfg.getGridName(), + cfg.getDrThreadPoolSize(), + cfg.getDrThreadPoolSize(), + DFLT_PUBLIC_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>(DFLT_DR_THREADPOOL_QUEUE_CAP)); + // Register Ignite MBean for current grid instance. registerFactoryMbean(myCfg.getMBeanServer()); @@ -1536,7 +1546,7 @@ public class IgnitionEx { grid = grid0; grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, - igfsExecSvc, restExecSvc, + igfsExecSvc, restExecSvc, drExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 4382731..b522a1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -83,6 +83,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Affinity assignment executor service. */ private ExecutorService affPool; + /** DR (Data Replication) executor service. */ + private ExecutorService drPool; + /** Utility cache pool. */ private ExecutorService utilityCachePool; @@ -185,6 +188,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa p2pPool = ctx.getPeerClassLoadingExecutorService(); sysPool = ctx.getSystemExecutorService(); mgmtPool = ctx.getManagementExecutorService(); + drPool = ctx.getDrExecutorService(); utilityCachePool = ctx.utilityCachePool(); marshCachePool = ctx.marshallerCachePool(); affPool = new IgniteThreadPoolExecutor( @@ -531,7 +535,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case MANAGEMENT_POOL: case AFFINITY_POOL: case UTILITY_CACHE_POOL: - case MARSH_CACHE_POOL: { + case MARSH_CACHE_POOL: + case DR_POOL: + { if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); else @@ -539,6 +545,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa break; } + + default: + throw new IllegalStateException("Failed to process message dues to " + + "unknown policy, [policy=" + msg.policy() + ']'); } } catch (IgniteCheckedException e) { @@ -568,6 +578,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case AFFINITY_POOL: return affPool; + case DR_POOL: + return drPool; + case UTILITY_CACHE_POOL: assert utilityCachePool != null : "Utility cache pool is not configured."; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java index 6e45043..ad74baa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java @@ -43,7 +43,10 @@ public enum GridIoPolicy { UTILITY_CACHE_POOL, /** Marshaller cache execution pool. */ - MARSH_CACHE_POOL; + MARSH_CACHE_POOL, + + /** DR internal messaging pool. */ + DR_POOL; /** Enum values. */ private static final GridIoPolicy[] VALS = values(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index 24502da..4485d5e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -46,6 +46,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, null, null, + null, U.allPluginProviders()); GridTestUtils.setFieldValue(grid(), "cfg", config()); @@ -102,4 +103,13 @@ public class GridTestKernalContext extends GridKernalContextImpl { public void setExecutorService(ExecutorService execSvc){ this.execSvc = execSvc; } + + /** + * Sets executor service. + * + * @param execSvc Executor service + */ + public void setDrExecutorService(ExecutorService execSvc){ + this.drExecSvc = execSvc; + } }