#ignite-239: small refactoring.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c1b4ad70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c1b4ad70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c1b4ad70 Branch: refs/heads/ignite-51 Commit: c1b4ad70cf1d1d954981c1056e4e66b021c465c7 Parents: 7e2b2e2 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Feb 20 18:29:11 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Feb 20 18:29:11 2015 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 4 +- .../org/apache/ignite/internal/IgnitionEx.java | 352 ++++++++++--------- 2 files changed, 179 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b4ad70/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index e554aea..d44b057 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -384,7 +384,6 @@ public class IgniteConfiguration { * * @param cfg Grid configuration to copy from. */ - @SuppressWarnings("deprecation") public IgniteConfiguration(IgniteConfiguration cfg) { assert cfg != null; @@ -455,7 +454,8 @@ public class IgniteConfiguration { sysPoolSize = cfg.getSystemThreadPoolSize(); timeSrvPortBase = cfg.getTimeServerPortBase(); timeSrvPortRange = cfg.getTimeServerPortRange(); - txCfg = cfg.getTransactionConfiguration(); + txCfg = cfg.getTransactionConfiguration() != null ? + new TransactionConfiguration(cfg.getTransactionConfiguration()) : null; userAttrs = cfg.getUserAttributes(); waitForSegOnStart = cfg.isWaitForSegmentOnStart(); warmupClos = cfg.getWarmupClosure(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b4ad70/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 34f29a7..9c196f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1333,6 +1333,8 @@ public class IgnitionEx { if (nodeId == null) nodeId = UUID.randomUUID(); + myCfg.setNodeId(nodeId); + IgniteLogger cfgLog = initLogger(cfg.getGridLogger(), nodeId); assert cfgLog != null; @@ -1342,6 +1344,8 @@ public class IgnitionEx { // Initialize factory's log. log = cfgLog.getLogger(G.class); + myCfg.setGridLogger(cfgLog); + // Check Ignite home folder (after log is available). if (ggHome != null) { File ggHomeFile = new File(ggHome); @@ -1352,13 +1356,6 @@ public class IgnitionEx { myCfg.setIgniteHome(ggHome); - myCfg.setTransactionConfiguration(new TransactionConfiguration(cfg.getTransactionConfiguration())); - - ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration(); - - if (clientCfg != null) - clientCfg = new ConnectorConfiguration(clientCfg); - // Local host. String locHost = IgniteSystemProperties.getString(IGNITE_LOCAL_HOST); @@ -1391,69 +1388,10 @@ public class IgnitionEx { } } - Map<String, ?> attrs = cfg.getUserAttributes(); - - if (attrs == null) - attrs = Collections.emptyMap(); - - MBeanServer mbSrv = cfg.getMBeanServer(); - - Marshaller marsh = cfg.getMarshaller(); - - String[] p2pExclude = cfg.getPeerClassLoadingLocalClassPathExclude(); - - - execSvc = new IgniteThreadPoolExecutor( - "pub-" + cfg.getGridName(), - cfg.getPublicThreadPoolSize(), - cfg.getPublicThreadPoolSize(), - DFLT_PUBLIC_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP)); - - // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads(); - - // Note that since we use 'LinkedBlockingQueue', number of - // maximum threads has no effect. - sysExecSvc = new IgniteThreadPoolExecutor( - "sys-" + cfg.getGridName(), - cfg.getSystemThreadPoolSize(), - cfg.getSystemThreadPoolSize(), - DFLT_SYSTEM_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); - - // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads(); - - // Note that since we use 'LinkedBlockingQueue', number of - // maximum threads has no effect. - // Note, that we do not pre-start threads here as management pool may - // not be needed. - mgmtExecSvc = new IgniteThreadPoolExecutor( - "mgmt-" + cfg.getGridName(), - cfg.getManagementThreadPoolSize(), - cfg.getManagementThreadPoolSize(), - 0, - new LinkedBlockingQueue<Runnable>()); - - // Note that since we use 'LinkedBlockingQueue', number of - // maximum threads has no effect. - // Note, that we do not pre-start threads here as class loading pool may - // not be needed. - p2pExecSvc = new IgniteThreadPoolExecutor( - "p2p-" + cfg.getGridName(), - cfg.getPeerClassLoadingThreadPoolSize(), - cfg.getPeerClassLoadingThreadPoolSize(), - 0, - new LinkedBlockingQueue<Runnable>()); + ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration(); - // Note that we do not pre-start threads here as igfs pool may not be needed. - igfsExecSvc = new IgniteThreadPoolExecutor( - "igfs-" + cfg.getGridName(), - cfg.getIgfsThreadPoolSize(), - cfg.getIgfsThreadPoolSize(), - 0, - new LinkedBlockingQueue<Runnable>()); + if (clientCfg != null) + clientCfg = new ConnectorConfiguration(clientCfg); if (clientCfg != null) { restExecSvc = new IgniteThreadPoolExecutor( @@ -1465,12 +1403,10 @@ public class IgnitionEx { ); } - utilityCacheExecSvc = new IgniteThreadPoolExecutor( - "utility-" + cfg.getGridName(), - DFLT_SYSTEM_CORE_THREAD_CNT, - DFLT_SYSTEM_MAX_THREAD_CNT, - DFLT_SYSTEM_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); + // REST configuration. + myCfg.setConnectorConfiguration(clientCfg); + + Marshaller marsh = cfg.getMarshaller(); if (marsh == null) { if (!U.isHotSpot()) { @@ -1500,12 +1436,18 @@ public class IgnitionEx { "Using GridOptimizedMarshaller on untested JVM."); } + myCfg.setMarshaller(marsh); + + Map<String, ?> attrs = cfg.getUserAttributes(); + + if (attrs == null) + attrs = Collections.emptyMap(); + myCfg.setUserAttributes(attrs); + + MBeanServer mbSrv = cfg.getMBeanServer(); + myCfg.setMBeanServer(mbSrv == null ? ManagementFactory.getPlatformMBeanServer() : mbSrv); - myCfg.setGridLogger(cfgLog); - myCfg.setMarshaller(marsh); - myCfg.setMarshalLocalJobs(cfg.isMarshalLocalJobs()); - myCfg.setNodeId(nodeId); IgfsConfiguration[] igfsCfgs = cfg.getIgfsConfiguration(); @@ -1529,14 +1471,13 @@ public class IgnitionEx { myCfg.setStreamerConfiguration(clone); } + String[] p2pExclude = cfg.getPeerClassLoadingLocalClassPathExclude(); + if (p2pExclude == null) p2pExclude = EMPTY_STR_ARR; myCfg.setPeerClassLoadingLocalClassPathExclude(p2pExclude); - // REST configuration. - myCfg.setConnectorConfiguration(clientCfg); - // Validate segmentation configuration. GridSegmentationPolicy segPlc = cfg.getSegmentationPolicy(); @@ -1547,85 +1488,7 @@ public class IgnitionEx { "on start?) [segPlc=" + segPlc + ", wait=false]"); } - - - /* - * Initialize default SPI implementations. - */ - CommunicationSpi commSpi = cfg.getCommunicationSpi(); - DiscoverySpi discoSpi = cfg.getDiscoverySpi(); - EventStorageSpi evtSpi = cfg.getEventStorageSpi(); - CollisionSpi colSpi = cfg.getCollisionSpi(); - DeploymentSpi deploySpi = cfg.getDeploymentSpi(); - CheckpointSpi[] cpSpi = cfg.getCheckpointSpi(); - FailoverSpi[] failSpi = cfg.getFailoverSpi(); - LoadBalancingSpi[] loadBalancingSpi = cfg.getLoadBalancingSpi(); - SwapSpaceSpi swapspaceSpi = cfg.getSwapSpaceSpi(); - IndexingSpi indexingSpi = cfg.getIndexingSpi(); - - - if (commSpi == null) - commSpi = new TcpCommunicationSpi(); - - if (discoSpi == null) - discoSpi = new TcpDiscoverySpi(); - - if (discoSpi instanceof TcpDiscoverySpi) { - TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)discoSpi; - - if (tcpDisco.getIpFinder() == null) - tcpDisco.setIpFinder(new TcpDiscoveryMulticastIpFinder()); - } - - if (evtSpi == null) - evtSpi = new MemoryEventStorageSpi(); - - if (colSpi == null) - colSpi = new NoopCollisionSpi(); - - if (deploySpi == null) - deploySpi = new LocalDeploymentSpi(); - - if (cpSpi == null) - cpSpi = new CheckpointSpi[] {new NoopCheckpointSpi()}; - - if (failSpi == null) - failSpi = new FailoverSpi[] {new AlwaysFailoverSpi()}; - - if (loadBalancingSpi == null) - loadBalancingSpi = new LoadBalancingSpi[] {new RoundRobinLoadBalancingSpi()}; - - if (swapspaceSpi == null) { - boolean needSwap = false; - - CacheConfiguration[] caches = cfg.getCacheConfiguration(); - - if (caches != null) { - for (CacheConfiguration c : caches) { - if (c.isSwapEnabled()) { - needSwap = true; - - break; - } - } - } - - swapspaceSpi = needSwap ? new FileSwapSpaceSpi() : new NoopSwapSpaceSpi(); - } - - if (indexingSpi == null) - indexingSpi = new NoopIndexingSpi(); - - myCfg.setCommunicationSpi(commSpi); - myCfg.setDiscoverySpi(discoSpi); - myCfg.setCheckpointSpi(cpSpi); - myCfg.setEventStorageSpi(evtSpi); - myCfg.setDeploymentSpi(deploySpi); - myCfg.setFailoverSpi(failSpi); - myCfg.setCollisionSpi(colSpi); - myCfg.setLoadBalancingSpi(loadBalancingSpi); - myCfg.setSwapSpaceSpi(swapspaceSpi); - myCfg.setIndexingSpi(indexingSpi); + copySpis(cfg, myCfg, startCtx.single()); CacheConfiguration[] cacheCfgs = cfg.getCacheConfiguration(); @@ -1633,12 +1496,12 @@ public class IgnitionEx { final boolean hasAtomics = cfg.getAtomicConfiguration() != null; - final boolean clientDisco = discoSpi instanceof TcpClientDiscoverySpi; + final boolean clientDisco = myCfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi; CacheConfiguration[] copies; if (cacheCfgs != null && cacheCfgs.length > 0) { - if (!U.discoOrdered(discoSpi) && !U.relaxDiscoveryOrdered()) + if (!U.discoOrdered(myCfg.getDiscoverySpi()) && !U.relaxDiscoveryOrdered()) throw new IgniteCheckedException("Discovery SPI implementation does not support node ordering and " + "cannot be used with cache (use SPI with @GridDiscoverySpiOrderSupport annotation, " + "like GridTcpDiscoverySpi)"); @@ -1716,18 +1579,64 @@ public class IgnitionEx { // No-op. } - // Ensure that SPIs support multiple grid instances, if required. - if (!startCtx.single()) { - ensureMultiInstanceSupport(deploySpi); - ensureMultiInstanceSupport(commSpi); - ensureMultiInstanceSupport(discoSpi); - ensureMultiInstanceSupport(cpSpi); - ensureMultiInstanceSupport(evtSpi); - ensureMultiInstanceSupport(colSpi); - ensureMultiInstanceSupport(failSpi); - ensureMultiInstanceSupport(loadBalancingSpi); - ensureMultiInstanceSupport(swapspaceSpi); - } + execSvc = new IgniteThreadPoolExecutor( + "pub-" + cfg.getGridName(), + cfg.getPublicThreadPoolSize(), + cfg.getPublicThreadPoolSize(), + DFLT_PUBLIC_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP)); + + // Pre-start all threads as they are guaranteed to be needed. + ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads(); + + // Note that since we use 'LinkedBlockingQueue', number of + // maximum threads has no effect. + sysExecSvc = new IgniteThreadPoolExecutor( + "sys-" + cfg.getGridName(), + cfg.getSystemThreadPoolSize(), + cfg.getSystemThreadPoolSize(), + DFLT_SYSTEM_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); + + // Pre-start all threads as they are guaranteed to be needed. + ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads(); + + // Note that since we use 'LinkedBlockingQueue', number of + // maximum threads has no effect. + // Note, that we do not pre-start threads here as management pool may + // not be needed. + mgmtExecSvc = new IgniteThreadPoolExecutor( + "mgmt-" + cfg.getGridName(), + cfg.getManagementThreadPoolSize(), + cfg.getManagementThreadPoolSize(), + 0, + new LinkedBlockingQueue<Runnable>()); + + // Note that since we use 'LinkedBlockingQueue', number of + // maximum threads has no effect. + // Note, that we do not pre-start threads here as class loading pool may + // not be needed. + p2pExecSvc = new IgniteThreadPoolExecutor( + "p2p-" + cfg.getGridName(), + cfg.getPeerClassLoadingThreadPoolSize(), + cfg.getPeerClassLoadingThreadPoolSize(), + 0, + new LinkedBlockingQueue<Runnable>()); + + // Note that we do not pre-start threads here as igfs pool may not be needed. + igfsExecSvc = new IgniteThreadPoolExecutor( + "igfs-" + cfg.getGridName(), + cfg.getIgfsThreadPoolSize(), + cfg.getIgfsThreadPoolSize(), + 0, + new LinkedBlockingQueue<Runnable>()); + + utilityCacheExecSvc = new IgniteThreadPoolExecutor( + "utility-" + cfg.getGridName(), + DFLT_SYSTEM_CORE_THREAD_CNT, + DFLT_SYSTEM_MAX_THREAD_CNT, + DFLT_SYSTEM_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); // Register Ignite MBean for current grid instance. registerFactoryMbean(myCfg.getMBeanServer()); @@ -1800,6 +1709,99 @@ public class IgnitionEx { } } + private void copySpis(IgniteConfiguration cfg, IgniteConfiguration myCfg, boolean singleGrid) + throws IgniteCheckedException { + /* + * Initialize default SPI implementations. + */ + CommunicationSpi commSpi = cfg.getCommunicationSpi(); + DiscoverySpi discoSpi = cfg.getDiscoverySpi(); + EventStorageSpi evtSpi = cfg.getEventStorageSpi(); + CollisionSpi colSpi = cfg.getCollisionSpi(); + DeploymentSpi deploySpi = cfg.getDeploymentSpi(); + CheckpointSpi[] cpSpi = cfg.getCheckpointSpi(); + FailoverSpi[] failSpi = cfg.getFailoverSpi(); + LoadBalancingSpi[] loadBalancingSpi = cfg.getLoadBalancingSpi(); + SwapSpaceSpi swapspaceSpi = cfg.getSwapSpaceSpi(); + IndexingSpi indexingSpi = cfg.getIndexingSpi(); + + if (commSpi == null) + commSpi = new TcpCommunicationSpi(); + + if (discoSpi == null) + discoSpi = new TcpDiscoverySpi(); + + if (discoSpi instanceof TcpDiscoverySpi) { + TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)discoSpi; + + if (tcpDisco.getIpFinder() == null) + tcpDisco.setIpFinder(new TcpDiscoveryMulticastIpFinder()); + } + + if (evtSpi == null) + evtSpi = new MemoryEventStorageSpi(); + + if (colSpi == null) + colSpi = new NoopCollisionSpi(); + + if (deploySpi == null) + deploySpi = new LocalDeploymentSpi(); + + if (cpSpi == null) + cpSpi = new CheckpointSpi[] {new NoopCheckpointSpi()}; + + if (failSpi == null) + failSpi = new FailoverSpi[] {new AlwaysFailoverSpi()}; + + if (loadBalancingSpi == null) + loadBalancingSpi = new LoadBalancingSpi[] {new RoundRobinLoadBalancingSpi()}; + + if (swapspaceSpi == null) { + boolean needSwap = false; + + CacheConfiguration[] caches = cfg.getCacheConfiguration(); + + if (caches != null) { + for (CacheConfiguration c : caches) { + if (c.isSwapEnabled()) { + needSwap = true; + + break; + } + } + } + + swapspaceSpi = needSwap ? new FileSwapSpaceSpi() : new NoopSwapSpaceSpi(); + } + + if (indexingSpi == null) + indexingSpi = new NoopIndexingSpi(); + + myCfg.setCommunicationSpi(commSpi); + myCfg.setDiscoverySpi(discoSpi); + myCfg.setCheckpointSpi(cpSpi); + myCfg.setEventStorageSpi(evtSpi); + myCfg.setDeploymentSpi(deploySpi); + myCfg.setFailoverSpi(failSpi); + myCfg.setCollisionSpi(colSpi); + myCfg.setLoadBalancingSpi(loadBalancingSpi); + myCfg.setSwapSpaceSpi(swapspaceSpi); + myCfg.setIndexingSpi(indexingSpi); + + // Ensure that SPIs support multiple grid instances, if required. + if (!singleGrid) { + ensureMultiInstanceSupport(deploySpi); + ensureMultiInstanceSupport(commSpi); + ensureMultiInstanceSupport(discoSpi); + ensureMultiInstanceSupport(cpSpi); + ensureMultiInstanceSupport(evtSpi); + ensureMultiInstanceSupport(colSpi); + ensureMultiInstanceSupport(failSpi); + ensureMultiInstanceSupport(loadBalancingSpi); + ensureMultiInstanceSupport(swapspaceSpi); + } + } + /** * @param cfgLog Configured logger. * @param nodeId Local node ID.