# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/50d1554c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/50d1554c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/50d1554c Branch: refs/heads/master Commit: 50d1554cf9e98cc3d5abba8ef37dd1950eeec6e9 Parents: 1ef8f69 Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 5 16:58:51 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 5 16:58:51 2014 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCompute.java | 3 +- .../compute/ComputeJobBeforeFailover.java | 4 +- .../ignite/compute/ComputeJobContext.java | 3 +- .../org/apache/ignite/compute/ComputeTask.java | 3 +- .../apache/ignite/compute/ComputeTaskSpis.java | 3 +- .../configuration/IgniteConfiguration.java | 26 +- .../jobstealing/JobStealingCollisionSpi.java | 3 +- .../org/gridgain/grid/kernal/GridGainEx.java | 8 +- .../eventstorage/GridEventStorageManager.java | 2 +- .../failover/GridFailoverContextImpl.java | 2 +- .../managers/failover/GridFailoverManager.java | 2 +- .../grid/spi/eventstorage/EventStorageSpi.java | 57 +++ .../spi/eventstorage/GridEventStorageSpi.java | 57 --- .../memory/GridMemoryEventStorageSpi.java | 272 --------------- .../memory/GridMemoryEventStorageSpiMBean.java | 57 --- .../memory/MemoryEventStorageSpi.java | 272 +++++++++++++++ .../memory/MemoryEventStorageSpiMBean.java | 57 +++ .../grid/spi/failover/FailoverContext.java | 47 +++ .../gridgain/grid/spi/failover/FailoverSpi.java | 60 ++++ .../grid/spi/failover/GridFailoverContext.java | 47 --- .../grid/spi/failover/GridFailoverSpi.java | 63 ---- .../spi/failover/always/AlwaysFailoverSpi.java | 238 +++++++++++++ .../failover/always/AlwaysFailoverSpiMBean.java | 36 ++ .../failover/always/GridAlwaysFailoverSpi.java | 238 ------------- .../always/GridAlwaysFailoverSpiMBean.java | 36 -- .../jobstealing/GridJobStealingFailoverSpi.java | 344 ------------------- .../GridJobStealingFailoverSpiMBean.java | 46 --- .../jobstealing/JobStealingFailoverSpi.java | 343 ++++++++++++++++++ .../JobStealingFailoverSpiMBean.java | 46 +++ .../failover/never/GridNeverFailoverSpi.java | 94 ----- .../never/GridNeverFailoverSpiMBean.java | 21 -- .../spi/failover/never/NeverFailoverSpi.java | 94 +++++ .../failover/never/NeverFailoverSpiMBean.java | 21 ++ .../src/test/config/job-loadtest/client.xml | 2 +- .../src/test/config/job-loadtest/server.xml | 2 +- .../core/src/test/config/load/dsi-load-base.xml | 2 +- ...bStealingCollisionSpiAttributesSelfTest.java | 2 +- ...alingCollisionSpiCustomTopologySelfTest.java | 2 +- .../GridJobStealingCollisionSpiSelfTest.java | 2 +- ...obStealingCollisionSpiStartStopSelfTest.java | 2 +- .../GridAlwaysFailoverSpiFailSelfTest.java | 4 +- .../GridFailoverCustomTopologySelfTest.java | 4 +- .../grid/kernal/GridFailoverSelfTest.java | 2 +- .../GridFailoverTaskWithPredicateSelfTest.java | 4 +- .../kernal/GridFailoverTopologySelfTest.java | 4 +- .../grid/kernal/GridJobStealingSelfTest.java | 4 +- .../GridJobStealingZeroActiveJobsSelfTest.java | 4 +- .../grid/kernal/GridMultipleSpisSelfTest.java | 4 +- .../GridMultithreadedJobStealingSelfTest.java | 4 +- .../grid/kernal/GridSpiExceptionSelfTest.java | 2 +- .../grid/kernal/GridStopWithWaitSelfTest.java | 2 +- .../managers/GridManagerStopSelfTest.java | 4 +- .../GridCacheGroupLockFailoverSelfTest.java | 4 +- .../cache/GridCachePutAllFailoverSelfTest.java | 4 +- .../GridCachePreloadEventsAbstractSelfTest.java | 2 +- .../GridCacheReplicatedPreloadSelfTest.java | 2 +- .../p2p/GridAbstractMultinodeRedeployTest.java | 2 +- .../session/GridSessionJobFailoverSelfTest.java | 2 +- ...MemoryEventStorageMultiThreadedSelfTest.java | 4 +- ...GridMemoryEventStorageSpiConfigSelfTest.java | 8 +- .../GridMemoryEventStorageSpiSelfTest.java | 10 +- ...dMemoryEventStorageSpiStartStopSelfTest.java | 4 +- .../spi/failover/GridFailoverTestContext.java | 2 +- .../GridAlwaysFailoverSpiConfigSelfTest.java | 6 +- .../always/GridAlwaysFailoverSpiSelfTest.java | 12 +- .../GridAlwaysFailoverSpiStartStopSelfTest.java | 4 +- ...ridJobStealingFailoverSpiConfigSelfTest.java | 6 +- ...idJobStealingFailoverSpiOneNodeSelfTest.java | 4 +- .../GridJobStealingFailoverSpiSelfTest.java | 8 +- ...JobStealingFailoverSpiStartStopSelfTest.java | 4 +- .../never/GridNeverFailoverSpiSelfTest.java | 4 +- .../GridNeverFailoverSpiStartStopSelfTest.java | 4 +- .../direct/stealing/GridStealingLoadTest.java | 4 +- 73 files changed, 1374 insertions(+), 1387 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java index 3d0185c..309a547 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java @@ -13,7 +13,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; -import org.gridgain.grid.spi.failover.*; import org.gridgain.grid.spi.loadbalancing.*; import org.jetbrains.annotations.*; @@ -64,7 +63,7 @@ import java.util.concurrent.*; * or has rejected execution due to lack of resources. By default, in case of failover, next * load balanced node will be picked for job execution. Also jobs will never be re-routed to the * nodes they have failed on. This behavior can be changed by configuring any of the existing or a custom - * {@link GridFailoverSpi} in grid configuration. + * {@link org.gridgain.grid.spi.failover.FailoverSpi} in grid configuration. * <h1 class="header">Resource Injection</h1> * All compute jobs, including closures, runnables, callables, and tasks can be injected with * grid resources. Both, field and method based injections are supported. The following grid http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobBeforeFailover.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobBeforeFailover.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobBeforeFailover.java index ae2f9dc..48067c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobBeforeFailover.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobBeforeFailover.java @@ -9,13 +9,11 @@ package org.apache.ignite.compute; -import org.gridgain.grid.spi.failover.*; - import java.lang.annotation.*; /** * This annotation allows to call a method right before job is submitted to - * {@link GridFailoverSpi}. In this method job can re-create necessary state that was + * {@link org.gridgain.grid.spi.failover.FailoverSpi}. In this method job can re-create necessary state that was * cleared, for example, in method with {@link ComputeJobAfterSend} annotation. * <p> * This annotation can be applied to methods of {@link ComputeJob} instances only. It is http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContext.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContext.java index b1107d2..a03641d 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContext.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContext.java @@ -10,7 +10,6 @@ package org.apache.ignite.compute; import org.apache.ignite.lang.*; -import org.gridgain.grid.spi.failover.*; import org.jetbrains.annotations.*; import java.util.*; @@ -36,7 +35,7 @@ import java.util.concurrent.*; * For example, if you need to cancel an actively running job from {@link org.apache.ignite.spi.collision.CollisionSpi} * you may choose to set some context attribute on the job to mark the fact * that a job was cancelled by grid and not by a user. Context attributes can - * also be assigned in {@link GridFailoverSpi} prior to failing over a job. + * also be assigned in {@link org.gridgain.grid.spi.failover.FailoverSpi} prior to failing over a job. * <p> * From within {@link ComputeTask#result(ComputeJobResult, List)} or {@link ComputeTask#reduce(List)} methods, * job context is available via {@link ComputeJobResult#getJobContext()} method which gives user the http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/apache/ignite/compute/ComputeTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTask.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTask.java index 3497983..cb81e8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTask.java @@ -11,7 +11,6 @@ package org.apache.ignite.compute; import org.apache.ignite.cluster.*; import org.gridgain.grid.*; -import org.gridgain.grid.spi.failover.*; import org.jetbrains.annotations.*; import java.io.*; @@ -73,7 +72,7 @@ import java.util.*; * <li> * If {@link ComputeJobResultPolicy#FAILOVER} policy is returned, then job will * be failed over to another node for execution. The node to which job will get - * failed over is decided by {@link GridFailoverSpi} SPI implementation. + * failed over is decided by {@link org.gridgain.grid.spi.failover.FailoverSpi} SPI implementation. * Note that if you use {@link ComputeTaskAdapter} adapter for {@code GridComputeTask} * implementation, then it will automatically fail jobs to another node for 2 * known failure cases: http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java index 05e43ef..1598d47 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java @@ -9,7 +9,6 @@ package org.apache.ignite.compute; -import org.gridgain.grid.spi.failover.*; import org.gridgain.grid.spi.loadbalancing.*; import java.lang.annotation.*; @@ -18,7 +17,7 @@ import java.lang.annotation.*; * This annotation allows task to specify what SPIs it wants to use. * Starting with {@code GridGain 2.1} you can start multiple instances * of {@link GridLoadBalancingSpi}, - * {@link GridFailoverSpi}, and {@link org.apache.ignite.spi.checkpoint.CheckpointSpi}. If you do that, + * {@link org.gridgain.grid.spi.failover.FailoverSpi}, and {@link org.apache.ignite.spi.checkpoint.CheckpointSpi}. If you do that, * you need to tell a task which SPI to use (by default it will use the fist * SPI in the list). */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 78e61b2..fc5349d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -33,9 +33,7 @@ import org.apache.ignite.spi.communication.*; import org.apache.ignite.spi.deployment.*; import org.apache.ignite.spi.discovery.*; import org.gridgain.grid.spi.eventstorage.*; -import org.gridgain.grid.spi.eventstorage.memory.*; import org.gridgain.grid.spi.failover.*; -import org.gridgain.grid.spi.failover.always.*; import org.gridgain.grid.spi.indexing.*; import org.gridgain.grid.spi.loadbalancing.*; import org.gridgain.grid.spi.loadbalancing.roundrobin.*; @@ -348,7 +346,7 @@ public class IgniteConfiguration { private CommunicationSpi commSpi; /** Event storage SPI. */ - private GridEventStorageSpi evtSpi; + private EventStorageSpi evtSpi; /** Collision SPI. */ private CollisionSpi colSpi; @@ -366,7 +364,7 @@ public class IgniteConfiguration { private CheckpointSpi[] cpSpi; /** Failover SPI. */ - private GridFailoverSpi[] failSpi; + private FailoverSpi[] failSpi; /** Load balancing SPI. */ private GridLoadBalancingSpi[] loadBalancingSpi; @@ -1754,21 +1752,21 @@ public class IgniteConfiguration { /** * Should return fully configured event SPI implementation. If not provided, - * {@link GridMemoryEventStorageSpi} will be used. + * {@link org.gridgain.grid.spi.eventstorage.memory.MemoryEventStorageSpi} will be used. * * @return Grid event SPI implementation or {@code null} to use default implementation. */ - public GridEventStorageSpi getEventStorageSpi() { + public EventStorageSpi getEventStorageSpi() { return evtSpi; } /** - * Sets fully configured instance of {@link GridEventStorageSpi}. + * Sets fully configured instance of {@link org.gridgain.grid.spi.eventstorage.EventStorageSpi}. * - * @param evtSpi Fully configured instance of {@link GridEventStorageSpi}. + * @param evtSpi Fully configured instance of {@link org.gridgain.grid.spi.eventstorage.EventStorageSpi}. * @see IgniteConfiguration#getEventStorageSpi() */ - public void setEventStorageSpi(GridEventStorageSpi evtSpi) { + public void setEventStorageSpi(EventStorageSpi evtSpi) { this.evtSpi = evtSpi; } @@ -2055,22 +2053,22 @@ public class IgniteConfiguration { /** * Should return fully configured failover SPI implementation. If not provided, - * {@link GridAlwaysFailoverSpi} will be used. + * {@link org.gridgain.grid.spi.failover.always.AlwaysFailoverSpi} will be used. * * @return Grid failover SPI implementation or {@code null} to use default implementation. */ - public GridFailoverSpi[] getFailoverSpi() { + public FailoverSpi[] getFailoverSpi() { return failSpi; } /** - * Sets fully configured instance of {@link GridFailoverSpi}. + * Sets fully configured instance of {@link org.gridgain.grid.spi.failover.FailoverSpi}. * - * @param failSpi Fully configured instance of {@link GridFailoverSpi} or + * @param failSpi Fully configured instance of {@link org.gridgain.grid.spi.failover.FailoverSpi} or * {@code null} if no SPI provided. * @see IgniteConfiguration#getFailoverSpi() */ - public void setFailoverSpi(GridFailoverSpi... failSpi) { + public void setFailoverSpi(FailoverSpi... failSpi) { this.failSpi = failSpi; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java index 47f69de..0e356af 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java @@ -18,7 +18,6 @@ import org.apache.ignite.spi.*; import org.gridgain.grid.kernal.managers.communication.*; import org.gridgain.grid.kernal.managers.eventstorage.*; import org.apache.ignite.spi.collision.*; -import org.gridgain.grid.spi.failover.jobstealing.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -55,7 +54,7 @@ import static org.apache.ignite.events.IgniteEventType.*; * <p> * <i> * Note that this SPI must always be used in conjunction with - * {@link GridJobStealingFailoverSpi}. + * {@link org.gridgain.grid.spi.failover.jobstealing.JobStealingFailoverSpi}. * Also note that job metrics update should be enabled in order for this SPI * to work properly (i.e. {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsUpdateFrequency()} * should be set to {@code 0} or greater value). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java index 1442ec9..716c435 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java @@ -1465,13 +1465,13 @@ public class GridGainEx { CommunicationSpi commSpi = cfg.getCommunicationSpi(); DiscoverySpi discoSpi = cfg.getDiscoverySpi(); - GridEventStorageSpi evtSpi = cfg.getEventStorageSpi(); + EventStorageSpi evtSpi = cfg.getEventStorageSpi(); CollisionSpi colSpi = cfg.getCollisionSpi(); AuthenticationSpi authSpi = cfg.getAuthenticationSpi(); GridSecureSessionSpi sesSpi = cfg.getSecureSessionSpi(); DeploymentSpi deploySpi = cfg.getDeploymentSpi(); CheckpointSpi[] cpSpi = cfg.getCheckpointSpi(); - GridFailoverSpi[] failSpi = cfg.getFailoverSpi(); + FailoverSpi[] failSpi = cfg.getFailoverSpi(); GridLoadBalancingSpi[] loadBalancingSpi = cfg.getLoadBalancingSpi(); GridSwapSpaceSpi swapspaceSpi = cfg.getSwapSpaceSpi(); GridIndexingSpi[] indexingSpi = cfg.getIndexingSpi(); @@ -1680,7 +1680,7 @@ public class GridGainEx { } if (evtSpi == null) - evtSpi = new GridMemoryEventStorageSpi(); + evtSpi = new MemoryEventStorageSpi(); if (colSpi == null) colSpi = new NoopCollisionSpi(); @@ -1698,7 +1698,7 @@ public class GridGainEx { cpSpi = new CheckpointSpi[] {new NoopCheckpointSpi()}; if (failSpi == null) - failSpi = new GridFailoverSpi[] {new GridAlwaysFailoverSpi()}; + failSpi = new FailoverSpi[] {new AlwaysFailoverSpi()}; if (loadBalancingSpi == null) loadBalancingSpi = new GridLoadBalancingSpi[] {new GridRoundRobinLoadBalancingSpi()}; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java index dbcca7d..b595b6c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/eventstorage/GridEventStorageManager.java @@ -40,7 +40,7 @@ import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; /** * Grid event storage SPI manager. */ -public class GridEventStorageManager extends GridManagerAdapter<GridEventStorageSpi> { +public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> { /** */ private static final int[] EMPTY = new int[0]; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverContextImpl.java index 7e18357..a63dad9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverContextImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverContextImpl.java @@ -22,7 +22,7 @@ import java.util.*; /** * GridFailoverContext implementation. */ -public class GridFailoverContextImpl implements GridFailoverContext { +public class GridFailoverContextImpl implements FailoverContext { /** Grid task session. */ private final GridTaskSessionImpl taskSes; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverManager.java index 367ab3a..b4fbde0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/failover/GridFailoverManager.java @@ -20,7 +20,7 @@ import java.util.*; /** * Grid failover spi manager. */ -public class GridFailoverManager extends GridManagerAdapter<GridFailoverSpi> { +public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> { /** * @param ctx Kernal context. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/EventStorageSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/EventStorageSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/EventStorageSpi.java new file mode 100644 index 0000000..da21ba9 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/EventStorageSpi.java @@ -0,0 +1,57 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.eventstorage; + +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; + +import java.util.*; + +/** + * This SPI provides local node events storage. SPI allows for recording local + * node events and querying recorded local events. Every node during its life-cycle + * goes through a serious of events such as task deployment, task execution, job + * execution, etc. For + * performance reasons GridGain is designed to store all locally produced events + * locally. These events can be later retrieved using either distributed query: + * <ul> + * <li>{@link org.apache.ignite.IgniteEvents#remoteQuery(org.apache.ignite.lang.IgnitePredicate, long, int...)}</li> + * </ul> + * or local only query: + * <ul> + * <li>{@link org.apache.ignite.IgniteEvents#localQuery(org.apache.ignite.lang.IgnitePredicate, int...)}</li> + * </ul> + * <b>NOTE:</b> this SPI (i.e. methods in this interface) should never be used directly. SPIs provide + * internal view on the subsystem and is used internally by GridGain kernal. In rare use cases when + * access to a specific implementation of this SPI is required - an instance of this SPI can be obtained + * via {@link org.apache.ignite.Ignite#configuration()} method to check its configuration properties or call other non-SPI + * methods. Note again that calling methods from this interface on the obtained instance can lead + * to undefined behavior and explicitly not supported. + * @see org.apache.ignite.events.IgniteEvent + */ +public interface EventStorageSpi extends IgniteSpi { + /** + * Queries locally-stored events only. Events could be filtered out + * by given predicate filter. + * + * @param p Event predicate filter. + * @return Collection of events. + */ + public <T extends IgniteEvent> Collection<T> localEvents(IgnitePredicate<T> p); + + /** + * Records single event. + * + * @param evt Event that should be recorded. + * @throws org.apache.ignite.spi.IgniteSpiException If event recording failed for any reason. + */ + public void record(IgniteEvent evt) throws IgniteSpiException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/GridEventStorageSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/GridEventStorageSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/GridEventStorageSpi.java deleted file mode 100644 index 6ed1c63..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/GridEventStorageSpi.java +++ /dev/null @@ -1,57 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.eventstorage; - -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.*; - -import java.util.*; - -/** - * This SPI provides local node events storage. SPI allows for recording local - * node events and querying recorded local events. Every node during its life-cycle - * goes through a serious of events such as task deployment, task execution, job - * execution, etc. For - * performance reasons GridGain is designed to store all locally produced events - * locally. These events can be later retrieved using either distributed query: - * <ul> - * <li>{@link org.apache.ignite.IgniteEvents#remoteQuery(org.apache.ignite.lang.IgnitePredicate, long, int...)}</li> - * </ul> - * or local only query: - * <ul> - * <li>{@link org.apache.ignite.IgniteEvents#localQuery(org.apache.ignite.lang.IgnitePredicate, int...)}</li> - * </ul> - * <b>NOTE:</b> this SPI (i.e. methods in this interface) should never be used directly. SPIs provide - * internal view on the subsystem and is used internally by GridGain kernal. In rare use cases when - * access to a specific implementation of this SPI is required - an instance of this SPI can be obtained - * via {@link org.apache.ignite.Ignite#configuration()} method to check its configuration properties or call other non-SPI - * methods. Note again that calling methods from this interface on the obtained instance can lead - * to undefined behavior and explicitly not supported. - * @see org.apache.ignite.events.IgniteEvent - */ -public interface GridEventStorageSpi extends IgniteSpi { - /** - * Queries locally-stored events only. Events could be filtered out - * by given predicate filter. - * - * @param p Event predicate filter. - * @return Collection of events. - */ - public <T extends IgniteEvent> Collection<T> localEvents(IgnitePredicate<T> p); - - /** - * Records single event. - * - * @param evt Event that should be recorded. - * @throws org.apache.ignite.spi.IgniteSpiException If event recording failed for any reason. - */ - public void record(IgniteEvent evt) throws IgniteSpiException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/GridMemoryEventStorageSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/GridMemoryEventStorageSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/GridMemoryEventStorageSpi.java deleted file mode 100644 index bae13fa..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/GridMemoryEventStorageSpi.java +++ /dev/null @@ -1,272 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.eventstorage.memory; - -import org.apache.ignite.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.spi.eventstorage.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; - -import java.util.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * In-memory {@link GridEventStorageSpi} implementation. All events are - * kept in the FIFO queue. If no configuration is provided a default expiration - * {@link #DFLT_EXPIRE_AGE_MS} and default count {@link #DFLT_EXPIRE_COUNT} will - * be used. - * <p> - * It's recommended not to set huge size and unlimited TTL because this might - * lead to consuming a lot of memory and result in {@link OutOfMemoryError}. - * Both event expiration time and maximum queue size could be changed at - * runtime. - * <p> - * <h1 class="header">Configuration</h1> - * <h2 class="header">Mandatory</h2> - * This SPI has no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * The following configuration parameters are optional: - * <ul> - * <li>Event queue size (see {@link #setExpireCount(long)})</li> - * <li>Event time-to-live value (see {@link #setExpireAgeMs(long)})</li> - * <li>{@link #setFilter(org.apache.ignite.lang.IgnitePredicate)} - Event filter that should be used for decision to accept event.</li> - * </ul> - * <h2 class="header">Java Example</h2> - * GridMemoryEventStorageSpi is used by default and should be explicitly configured only - * if some SPI configuration parameters need to be overridden. Examples below insert own - * events queue size value that differs from default 10000. - * <pre name="code" class="java"> - * GridMemoryEventStorageSpi = new GridMemoryEventStorageSpi(); - * - * // Init own events size. - * spi.setExpireCount(2000); - * - * GridConfiguration cfg = new GridConfiguration(); - * - * // Override default event storage SPI. - * cfg.setEventStorageSpi(spi); - * - * // Starts grid. - * G.start(cfg); - * </pre> - * <h2 class="header">Spring Example</h2> - * GridMemoryEventStorageSpi can be configured from Spring XML configuration file: - * <pre name="code" class="xml"> - * <bean id="grid.custom.cfg" class="org.gridgain.grid.GridConfiguration" singleton="true"> - * ... - * <property name="discoverySpi"> - * <bean class="org.gridgain.grid.spi.eventStorage.memory.GridMemoryEventStorageSpi"> - * <property name="expireCount" value="2000"/> - * </bean> - * </property> - * ... - * </bean> - * </pre> - * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> - * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - * @see GridEventStorageSpi - */ -@IgniteSpiMultipleInstancesSupport(true) -public class GridMemoryEventStorageSpi extends IgniteSpiAdapter implements GridEventStorageSpi, - GridMemoryEventStorageSpiMBean { - /** Default event time to live value in milliseconds (value is {@link Long#MAX_VALUE}). */ - public static final long DFLT_EXPIRE_AGE_MS = Long.MAX_VALUE; - - /** Default expire count (value is {@code 10000}). */ - public static final int DFLT_EXPIRE_COUNT = 10000; - - /** */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Event time-to-live value in milliseconds. */ - private long expireAgeMs = DFLT_EXPIRE_AGE_MS; - - /** Maximum queue size. */ - private long expireCnt = DFLT_EXPIRE_COUNT; - - /** Events queue. */ - private ConcurrentLinkedDeque8<IgniteEvent> evts = new ConcurrentLinkedDeque8<>(); - - /** Configured event predicate filter. */ - private IgnitePredicate<IgniteEvent> filter; - - /** - * Gets filter for events to be recorded. - * - * @return Filter to use. - */ - public IgnitePredicate<IgniteEvent> getFilter() { - return filter; - } - - /** - * Sets filter for events to be recorded. - * - * @param filter Filter to use. - */ - @IgniteSpiConfiguration(optional = true) - public void setFilter(IgnitePredicate<IgniteEvent> filter) { - this.filter = filter; - } - - /** {@inheritDoc} */ - @Override public void spiStart(String gridName) throws IgniteSpiException { - // Start SPI start stopwatch. - startStopwatch(); - - assertParameter(expireCnt > 0, "expireCnt > 0"); - assertParameter(expireAgeMs > 0, "expireAgeMs > 0"); - - // Ack parameters. - if (log.isDebugEnabled()) { - log.debug(configInfo("expireAgeMs", expireAgeMs)); - log.debug(configInfo("expireCnt", expireCnt)); - } - - registerMBean(gridName, this, GridMemoryEventStorageSpiMBean.class); - - // Ack ok start. - if (log.isDebugEnabled()) - log.debug(startInfo()); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - unregisterMBean(); - - // Reset events. - evts.clear(); - - // Ack ok stop. - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - - /** - * Sets events expiration time. All events that exceed this value - * will be removed from the queue when next event comes. - * <p> - * If not provided, default value is {@link #DFLT_EXPIRE_AGE_MS}. - * - * @param expireAgeMs Expiration time in milliseconds. - */ - @IgniteSpiConfiguration(optional = true) - public void setExpireAgeMs(long expireAgeMs) { - this.expireAgeMs = expireAgeMs; - } - - /** - * Sets events queue size. Events will be filtered out when new request comes. - * <p> - * If not provided, default value {@link #DFLT_EXPIRE_COUNT} will be used. - * - * @param expireCnt Maximum queue size. - */ - @IgniteSpiConfiguration(optional = true) - public void setExpireCount(long expireCnt) { - this.expireCnt = expireCnt; - } - - /** {@inheritDoc} */ - @Override public long getExpireAgeMs() { - return expireAgeMs; - } - - /** {@inheritDoc} */ - @Override public long getExpireCount() { - return expireCnt; - } - - /** {@inheritDoc} */ - @Override public long getQueueSize() { - return evts.sizex(); - } - - /** {@inheritDoc} */ - @Override public void clearAll() { - evts.clear(); - } - - /** {@inheritDoc} */ - @Override public <T extends IgniteEvent> Collection<T> localEvents(IgnitePredicate<T> p) { - A.notNull(p, "p"); - - cleanupQueue(); - - return F.retain((Collection<T>)evts, true, p); - } - - /** {@inheritDoc} */ - @Override public void record(IgniteEvent evt) throws IgniteSpiException { - assert evt != null; - - // Filter out events. - if (filter == null || filter.apply(evt)) { - cleanupQueue(); - - evts.add(evt); - - // Make sure to filter out metrics updates to prevent log from flooding. - if (evt.type() != EVT_NODE_METRICS_UPDATED && log.isDebugEnabled()) - log.debug("Event recorded: " + evt); - } - } - - /** - * Method cleans up all events that either outnumber queue size - * or exceeds time-to-live value. It does none if someone else - * cleans up queue (lock is locked) or if there are queue readers - * (readersNum > 0). - */ - private void cleanupQueue() { - long now = U.currentTimeMillis(); - - long queueOversize = evts.sizex() - expireCnt; - - for (int i = 0; i < queueOversize && evts.sizex() > expireCnt; i++) { - IgniteEvent expired = evts.poll(); - - if (log.isDebugEnabled()) - log.debug("Event expired by count: " + expired); - } - - while (true) { - ConcurrentLinkedDeque8.Node<IgniteEvent> node = evts.peekx(); - - if (node == null) // Queue is empty. - break; - - IgniteEvent evt = node.item(); - - if (evt == null) // Competing with another thread. - continue; - - if (now - evt.timestamp() < expireAgeMs) - break; - - if (evts.unlinkx(node) && log.isDebugEnabled()) - log.debug("Event expired by age: " + node.item()); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridMemoryEventStorageSpi.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/GridMemoryEventStorageSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/GridMemoryEventStorageSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/GridMemoryEventStorageSpiMBean.java deleted file mode 100644 index 97b35ad..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/GridMemoryEventStorageSpiMBean.java +++ /dev/null @@ -1,57 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.eventstorage.memory; - -import org.apache.ignite.mbean.*; -import org.apache.ignite.spi.*; - -/** - * Management bean for {@link GridMemoryEventStorageSpi}. - * Beside properties defined for every SPI bean this one gives access to: - * <ul> - * <li>Event expiration time (see {@link #getExpireAgeMs()})</li> - * <li>Maximum queue size (see {@link #getExpireCount()})</li> - * <li>Method that removes all items from queue (see {@link #clearAll()})</li> - * </ul> - */ -@IgniteMBeanDescription("MBean that provides access to memory event storage SPI configuration.") -public interface GridMemoryEventStorageSpiMBean extends IgniteSpiManagementMBean { - /** - * Gets event time-to-live value. Implementation must guarantee - * that event would not be accessible if its lifetime exceeds this value. - * - * @return Event time-to-live. - */ - @IgniteMBeanDescription("Event time-to-live value.") - public long getExpireAgeMs(); - - /** - * Gets maximum event queue size. New incoming events will oust - * oldest ones if queue size exceeds this limit. - * - * @return Maximum event queue size. - */ - @IgniteMBeanDescription("Maximum event queue size.") - public long getExpireCount(); - - /** - * Gets current queue size of the event queue. - * - * @return Current queue size of the event queue. - */ - @IgniteMBeanDescription("Current event queue size.") - public long getQueueSize(); - - /** - * Removes all events from the event queue. - */ - @IgniteMBeanDescription("Removes all events from the event queue.") - public void clearAll(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/MemoryEventStorageSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/MemoryEventStorageSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/MemoryEventStorageSpi.java new file mode 100644 index 0000000..3364320 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/MemoryEventStorageSpi.java @@ -0,0 +1,272 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.eventstorage.memory; + +import org.apache.ignite.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.spi.eventstorage.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; + +import java.util.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * In-memory {@link org.gridgain.grid.spi.eventstorage.EventStorageSpi} implementation. All events are + * kept in the FIFO queue. If no configuration is provided a default expiration + * {@link #DFLT_EXPIRE_AGE_MS} and default count {@link #DFLT_EXPIRE_COUNT} will + * be used. + * <p> + * It's recommended not to set huge size and unlimited TTL because this might + * lead to consuming a lot of memory and result in {@link OutOfMemoryError}. + * Both event expiration time and maximum queue size could be changed at + * runtime. + * <p> + * <h1 class="header">Configuration</h1> + * <h2 class="header">Mandatory</h2> + * This SPI has no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * The following configuration parameters are optional: + * <ul> + * <li>Event queue size (see {@link #setExpireCount(long)})</li> + * <li>Event time-to-live value (see {@link #setExpireAgeMs(long)})</li> + * <li>{@link #setFilter(org.apache.ignite.lang.IgnitePredicate)} - Event filter that should be used for decision to accept event.</li> + * </ul> + * <h2 class="header">Java Example</h2> + * GridMemoryEventStorageSpi is used by default and should be explicitly configured only + * if some SPI configuration parameters need to be overridden. Examples below insert own + * events queue size value that differs from default 10000. + * <pre name="code" class="java"> + * GridMemoryEventStorageSpi = new GridMemoryEventStorageSpi(); + * + * // Init own events size. + * spi.setExpireCount(2000); + * + * GridConfiguration cfg = new GridConfiguration(); + * + * // Override default event storage SPI. + * cfg.setEventStorageSpi(spi); + * + * // Starts grid. + * G.start(cfg); + * </pre> + * <h2 class="header">Spring Example</h2> + * GridMemoryEventStorageSpi can be configured from Spring XML configuration file: + * <pre name="code" class="xml"> + * <bean id="grid.custom.cfg" class="org.gridgain.grid.GridConfiguration" singleton="true"> + * ... + * <property name="discoverySpi"> + * <bean class="org.gridgain.grid.spi.eventStorage.memory.GridMemoryEventStorageSpi"> + * <property name="expireCount" value="2000"/> + * </bean> + * </property> + * ... + * </bean> + * </pre> + * <p> + * <img src="http://www.gridgain.com/images/spring-small.png"> + * <br> + * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> + * @see org.gridgain.grid.spi.eventstorage.EventStorageSpi + */ +@IgniteSpiMultipleInstancesSupport(true) +public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStorageSpi, + MemoryEventStorageSpiMBean { + /** Default event time to live value in milliseconds (value is {@link Long#MAX_VALUE}). */ + public static final long DFLT_EXPIRE_AGE_MS = Long.MAX_VALUE; + + /** Default expire count (value is {@code 10000}). */ + public static final int DFLT_EXPIRE_COUNT = 10000; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Event time-to-live value in milliseconds. */ + private long expireAgeMs = DFLT_EXPIRE_AGE_MS; + + /** Maximum queue size. */ + private long expireCnt = DFLT_EXPIRE_COUNT; + + /** Events queue. */ + private ConcurrentLinkedDeque8<IgniteEvent> evts = new ConcurrentLinkedDeque8<>(); + + /** Configured event predicate filter. */ + private IgnitePredicate<IgniteEvent> filter; + + /** + * Gets filter for events to be recorded. + * + * @return Filter to use. + */ + public IgnitePredicate<IgniteEvent> getFilter() { + return filter; + } + + /** + * Sets filter for events to be recorded. + * + * @param filter Filter to use. + */ + @IgniteSpiConfiguration(optional = true) + public void setFilter(IgnitePredicate<IgniteEvent> filter) { + this.filter = filter; + } + + /** {@inheritDoc} */ + @Override public void spiStart(String gridName) throws IgniteSpiException { + // Start SPI start stopwatch. + startStopwatch(); + + assertParameter(expireCnt > 0, "expireCnt > 0"); + assertParameter(expireAgeMs > 0, "expireAgeMs > 0"); + + // Ack parameters. + if (log.isDebugEnabled()) { + log.debug(configInfo("expireAgeMs", expireAgeMs)); + log.debug(configInfo("expireCnt", expireCnt)); + } + + registerMBean(gridName, this, MemoryEventStorageSpiMBean.class); + + // Ack ok start. + if (log.isDebugEnabled()) + log.debug(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + unregisterMBean(); + + // Reset events. + evts.clear(); + + // Ack ok stop. + if (log.isDebugEnabled()) + log.debug(stopInfo()); + } + + /** + * Sets events expiration time. All events that exceed this value + * will be removed from the queue when next event comes. + * <p> + * If not provided, default value is {@link #DFLT_EXPIRE_AGE_MS}. + * + * @param expireAgeMs Expiration time in milliseconds. + */ + @IgniteSpiConfiguration(optional = true) + public void setExpireAgeMs(long expireAgeMs) { + this.expireAgeMs = expireAgeMs; + } + + /** + * Sets events queue size. Events will be filtered out when new request comes. + * <p> + * If not provided, default value {@link #DFLT_EXPIRE_COUNT} will be used. + * + * @param expireCnt Maximum queue size. + */ + @IgniteSpiConfiguration(optional = true) + public void setExpireCount(long expireCnt) { + this.expireCnt = expireCnt; + } + + /** {@inheritDoc} */ + @Override public long getExpireAgeMs() { + return expireAgeMs; + } + + /** {@inheritDoc} */ + @Override public long getExpireCount() { + return expireCnt; + } + + /** {@inheritDoc} */ + @Override public long getQueueSize() { + return evts.sizex(); + } + + /** {@inheritDoc} */ + @Override public void clearAll() { + evts.clear(); + } + + /** {@inheritDoc} */ + @Override public <T extends IgniteEvent> Collection<T> localEvents(IgnitePredicate<T> p) { + A.notNull(p, "p"); + + cleanupQueue(); + + return F.retain((Collection<T>)evts, true, p); + } + + /** {@inheritDoc} */ + @Override public void record(IgniteEvent evt) throws IgniteSpiException { + assert evt != null; + + // Filter out events. + if (filter == null || filter.apply(evt)) { + cleanupQueue(); + + evts.add(evt); + + // Make sure to filter out metrics updates to prevent log from flooding. + if (evt.type() != EVT_NODE_METRICS_UPDATED && log.isDebugEnabled()) + log.debug("Event recorded: " + evt); + } + } + + /** + * Method cleans up all events that either outnumber queue size + * or exceeds time-to-live value. It does none if someone else + * cleans up queue (lock is locked) or if there are queue readers + * (readersNum > 0). + */ + private void cleanupQueue() { + long now = U.currentTimeMillis(); + + long queueOversize = evts.sizex() - expireCnt; + + for (int i = 0; i < queueOversize && evts.sizex() > expireCnt; i++) { + IgniteEvent expired = evts.poll(); + + if (log.isDebugEnabled()) + log.debug("Event expired by count: " + expired); + } + + while (true) { + ConcurrentLinkedDeque8.Node<IgniteEvent> node = evts.peekx(); + + if (node == null) // Queue is empty. + break; + + IgniteEvent evt = node.item(); + + if (evt == null) // Competing with another thread. + continue; + + if (now - evt.timestamp() < expireAgeMs) + break; + + if (evts.unlinkx(node) && log.isDebugEnabled()) + log.debug("Event expired by age: " + node.item()); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MemoryEventStorageSpi.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/MemoryEventStorageSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/MemoryEventStorageSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/MemoryEventStorageSpiMBean.java new file mode 100644 index 0000000..eab8a92 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/eventstorage/memory/MemoryEventStorageSpiMBean.java @@ -0,0 +1,57 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.eventstorage.memory; + +import org.apache.ignite.mbean.*; +import org.apache.ignite.spi.*; + +/** + * Management bean for {@link MemoryEventStorageSpi}. + * Beside properties defined for every SPI bean this one gives access to: + * <ul> + * <li>Event expiration time (see {@link #getExpireAgeMs()})</li> + * <li>Maximum queue size (see {@link #getExpireCount()})</li> + * <li>Method that removes all items from queue (see {@link #clearAll()})</li> + * </ul> + */ +@IgniteMBeanDescription("MBean that provides access to memory event storage SPI configuration.") +public interface MemoryEventStorageSpiMBean extends IgniteSpiManagementMBean { + /** + * Gets event time-to-live value. Implementation must guarantee + * that event would not be accessible if its lifetime exceeds this value. + * + * @return Event time-to-live. + */ + @IgniteMBeanDescription("Event time-to-live value.") + public long getExpireAgeMs(); + + /** + * Gets maximum event queue size. New incoming events will oust + * oldest ones if queue size exceeds this limit. + * + * @return Maximum event queue size. + */ + @IgniteMBeanDescription("Maximum event queue size.") + public long getExpireCount(); + + /** + * Gets current queue size of the event queue. + * + * @return Current queue size of the event queue. + */ + @IgniteMBeanDescription("Current event queue size.") + public long getQueueSize(); + + /** + * Removes all events from the event queue. + */ + @IgniteMBeanDescription("Removes all events from the event queue.") + public void clearAll(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/failover/FailoverContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/gridgain/grid/spi/failover/FailoverContext.java new file mode 100644 index 0000000..7db8c1c --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/failover/FailoverContext.java @@ -0,0 +1,47 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.failover; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.gridgain.grid.*; +import org.gridgain.grid.spi.loadbalancing.*; +import java.util.*; + +/** + * This interface defines a set of operations available to failover SPI + * one a given failed job. + */ +public interface FailoverContext { + /** + * Gets current task session. + * + * @return Grid task session. + */ + public ComputeTaskSession getTaskSession(); + + /** + * Gets failed result of job execution. + * + * @return Result of a failed job. + */ + public ComputeJobResult getJobResult(); + + /** + * Gets the next balanced node for failed job. Internally this method will + * delegate to load balancing SPI (see {@link GridLoadBalancingSpi} to + * determine the optimal node for execution. + * + * @param top Topology to pick balanced node from. + * @return The next balanced node. + * @throws GridException If anything failed. + */ + public ClusterNode getBalancedNode(List<ClusterNode> top) throws GridException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/failover/FailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/failover/FailoverSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/failover/FailoverSpi.java new file mode 100644 index 0000000..393391d --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/failover/FailoverSpi.java @@ -0,0 +1,60 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.failover; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.spi.*; + +import java.util.*; + +/** + * Failover SPI provides developer with ability to supply custom logic for handling + * failed execution of a grid job. Job execution can fail for a number of reasons: + * <ul> + * <li>Job execution threw an exception (runtime, assertion or error)</li> + * <li>Node on which job was execution left topology (crashed or stopped)</li> + * <li>Collision SPI on remote node cancelled a job before it got a chance to execute (job rejection).</li> + * </ul> + * In all cases failover SPI takes failed job (as failover context) and list of all + * grid nodes and provides another node on which the job execution will be retried. + * It is up to failover SPI to make sure that job is not mapped to the node it + * failed on. The failed node can be retrieved from + * {@link org.apache.ignite.compute.ComputeJobResult#getNode() GridFailoverContext.getJobResult().node()} + * method. + * <p> + * GridGain comes with the following built-in failover SPI implementations: + * <ul> + * <li>{@link org.gridgain.grid.spi.failover.never.NeverFailoverSpi}</li> + * <li>{@link org.gridgain.grid.spi.failover.always.AlwaysFailoverSpi}</li> + * <li>{@link org.gridgain.grid.spi.failover.jobstealing.JobStealingFailoverSpi}</li> + * </ul> + * <b>NOTE:</b> this SPI (i.e. methods in this interface) should never be used directly. SPIs provide + * internal view on the subsystem and is used internally by GridGain kernal. In rare use cases when + * access to a specific implementation of this SPI is required - an instance of this SPI can be obtained + * via {@link org.apache.ignite.Ignite#configuration()} method to check its configuration properties or call other non-SPI + * methods. Note again that calling methods from this interface on the obtained instance can lead + * to undefined behavior and explicitly not supported. + */ +public interface FailoverSpi extends IgniteSpi { + /** + * This method is called when method {@link org.apache.ignite.compute.ComputeTask#result(org.apache.ignite.compute.ComputeJobResult, List)} returns + * value {@link org.apache.ignite.compute.ComputeJobResultPolicy#FAILOVER} policy indicating that the result of + * job execution must be failed over. Implementation of this method should examine failover + * context and choose one of the grid nodes from supplied {@code topology} to retry job execution + * on it. For best performance it is advised that {@link FailoverContext#getBalancedNode(List)} + * method is used to select node for execution of failed job. + * + * @param ctx Failover context. + * @param top Collection of all grid nodes within task topology (may include failed node). + * @return New node to route this job to or {@code null} if new node cannot be picked. + * If job failover fails (returns {@code null}) the whole task will be failed. + */ + public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/failover/GridFailoverContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/failover/GridFailoverContext.java b/modules/core/src/main/java/org/gridgain/grid/spi/failover/GridFailoverContext.java deleted file mode 100644 index b89b1db..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/failover/GridFailoverContext.java +++ /dev/null @@ -1,47 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.failover; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.loadbalancing.*; -import java.util.*; - -/** - * This interface defines a set of operations available to failover SPI - * one a given failed job. - */ -public interface GridFailoverContext { - /** - * Gets current task session. - * - * @return Grid task session. - */ - public ComputeTaskSession getTaskSession(); - - /** - * Gets failed result of job execution. - * - * @return Result of a failed job. - */ - public ComputeJobResult getJobResult(); - - /** - * Gets the next balanced node for failed job. Internally this method will - * delegate to load balancing SPI (see {@link GridLoadBalancingSpi} to - * determine the optimal node for execution. - * - * @param top Topology to pick balanced node from. - * @return The next balanced node. - * @throws GridException If anything failed. - */ - public ClusterNode getBalancedNode(List<ClusterNode> top) throws GridException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/failover/GridFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/failover/GridFailoverSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/failover/GridFailoverSpi.java deleted file mode 100644 index a0f8ac8..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/failover/GridFailoverSpi.java +++ /dev/null @@ -1,63 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.failover; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.spi.failover.always.*; -import org.gridgain.grid.spi.failover.jobstealing.*; -import org.gridgain.grid.spi.failover.never.*; - -import java.util.*; - -/** - * Failover SPI provides developer with ability to supply custom logic for handling - * failed execution of a grid job. Job execution can fail for a number of reasons: - * <ul> - * <li>Job execution threw an exception (runtime, assertion or error)</li> - * <li>Node on which job was execution left topology (crashed or stopped)</li> - * <li>Collision SPI on remote node cancelled a job before it got a chance to execute (job rejection).</li> - * </ul> - * In all cases failover SPI takes failed job (as failover context) and list of all - * grid nodes and provides another node on which the job execution will be retried. - * It is up to failover SPI to make sure that job is not mapped to the node it - * failed on. The failed node can be retrieved from - * {@link org.apache.ignite.compute.ComputeJobResult#getNode() GridFailoverContext.getJobResult().node()} - * method. - * <p> - * GridGain comes with the following built-in failover SPI implementations: - * <ul> - * <li>{@link GridNeverFailoverSpi}</li> - * <li>{@link GridAlwaysFailoverSpi}</li> - * <li>{@link GridJobStealingFailoverSpi}</li> - * </ul> - * <b>NOTE:</b> this SPI (i.e. methods in this interface) should never be used directly. SPIs provide - * internal view on the subsystem and is used internally by GridGain kernal. In rare use cases when - * access to a specific implementation of this SPI is required - an instance of this SPI can be obtained - * via {@link org.apache.ignite.Ignite#configuration()} method to check its configuration properties or call other non-SPI - * methods. Note again that calling methods from this interface on the obtained instance can lead - * to undefined behavior and explicitly not supported. - */ -public interface GridFailoverSpi extends IgniteSpi { - /** - * This method is called when method {@link org.apache.ignite.compute.ComputeTask#result(org.apache.ignite.compute.ComputeJobResult, List)} returns - * value {@link org.apache.ignite.compute.ComputeJobResultPolicy#FAILOVER} policy indicating that the result of - * job execution must be failed over. Implementation of this method should examine failover - * context and choose one of the grid nodes from supplied {@code topology} to retry job execution - * on it. For best performance it is advised that {@link GridFailoverContext#getBalancedNode(List)} - * method is used to select node for execution of failed job. - * - * @param ctx Failover context. - * @param top Collection of all grid nodes within task topology (may include failed node). - * @return New node to route this job to or {@code null} if new node cannot be picked. - * If job failover fails (returns {@code null}) the whole task will be failed. - */ - public ClusterNode failover(GridFailoverContext ctx, List<ClusterNode> top); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/AlwaysFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/AlwaysFailoverSpi.java new file mode 100644 index 0000000..a33e519 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/AlwaysFailoverSpi.java @@ -0,0 +1,238 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.failover.always; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.spi.failover.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.util.*; + +/** + * Failover SPI that always reroutes a failed job to another node. + * Note, that at first an attempt will be made to reroute the failed job + * to a node that was not part of initial split for a better chance of + * success. If no such nodes are available, then an attempt will be made to + * reroute the failed job to the nodes in the initial split minus the node + * the job is failed on. If none of the above attempts succeeded, then the + * job will not be failed over and {@code null} will be returned. + * <p> + * <h1 class="header">Configuration</h1> + * This SPI is default failover SPI and does not have to be explicitly + * configured unless configuration parameters need to be changed. + * <h2 class="header">Mandatory</h2> + * This SPI has no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * This SPI has following optional configuration parameters: + * <ul> + * <li> + * Maximum failover attempts for a single job (see {@link #setMaximumFailoverAttempts(int)}). + * If maximum failover attempts is reached, then job will not be failed-over and, + * hence, will fail. + * </li> + * </ul> + * Here is a Java example how to configure grid with {@code GridAlwaysFailoverSpi} failover SPI. + * <pre name="code" class="java"> + * GridAlwaysFailoverSpi spi = new GridAlwaysFailoverSpi(); + * + * // Override maximum failover attempts. + * spi.setMaximumFailoverAttempts(5); + * + * GridConfiguration cfg = new GridConfiguration(); + * + * // Override default failover SPI. + * cfg.setFailoverSpiSpi(spi); + * + * // Starts grid. + * G.start(cfg); + * </pre> + * Here is an example of how to configure {@code GridAlwaysFailoverSpi} from Spring XML configuration file. + * <pre name="code" class="xml"> + * <property name="failoverSpi"> + * <bean class="org.gridgain.grid.spi.failover.always.GridAlwaysFailoverSpi"> + * <property name="maximumFailoverAttempts" value="5"/> + * </bean> + * </property> + * </pre> + * <p> + * <img src="http://www.gridgain.com/images/spring-small.png"> + * <br> + * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> + * @see org.gridgain.grid.spi.failover.FailoverSpi + */ +@IgniteSpiMultipleInstancesSupport(true) +@IgniteSpiConsistencyChecked(optional = true) +public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, AlwaysFailoverSpiMBean { + /** Maximum number of attempts to execute a failed job on another node (default is {@code 5}). */ + public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 5; + + /** + * Name of job context attribute containing all nodes a job failed on. + * + * @see org.apache.ignite.compute.ComputeJobContext + */ + public static final String FAILED_NODE_LIST_ATTR = "gg:failover:failednodelist"; + + /** Maximum attempts attribute key should be the same on all nodes. */ + public static final String MAX_FAILOVER_ATTEMPT_ATTR = "gg:failover:maxattempts"; + + /** Injected grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Maximum number of attempts to execute a failed job on another node. */ + private int maxFailoverAttempts = DFLT_MAX_FAILOVER_ATTEMPTS; + + /** Number of jobs that were failed over. */ + private int totalFailoverJobs; + + /** {@inheritDoc} */ + @Override public int getMaximumFailoverAttempts() { + return maxFailoverAttempts; + } + + /** + * Sets maximum number of attempts to execute a failed job on another node. + * If not specified, {@link #DFLT_MAX_FAILOVER_ATTEMPTS} value will be used. + * + * @param maxFailoverAttempts Maximum number of attempts to execute a failed job on another node. + */ + @IgniteSpiConfiguration(optional = true) + public void setMaximumFailoverAttempts(int maxFailoverAttempts) { + this.maxFailoverAttempts = maxFailoverAttempts; + } + + /** {@inheritDoc} */ + @Override public int getTotalFailoverJobsCount() { + return totalFailoverJobs; + } + + /** {@inheritDoc} */ + @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { + return F.<String, Object>asMap(createSpiAttributeName(MAX_FAILOVER_ATTEMPT_ATTR), maxFailoverAttempts); + } + + /** {@inheritDoc} */ + @Override public void spiStart(String gridName) throws IgniteSpiException { + // Start SPI start stopwatch. + startStopwatch(); + + assertParameter(maxFailoverAttempts >= 0, "maxFailoverAttempts >= 0"); + + if (log.isDebugEnabled()) + log.debug(configInfo("maximumFailoverAttempts", maxFailoverAttempts)); + + registerMBean(gridName, this, AlwaysFailoverSpiMBean.class); + + // Ack ok start. + if (log.isDebugEnabled()) + log.debug(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + unregisterMBean(); + + // Ack ok stop. + if (log.isDebugEnabled()) + log.debug(stopInfo()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top) { + assert ctx != null; + assert top != null; + + if (log.isDebugEnabled()) + log.debug("Received failed job result: " + ctx.getJobResult()); + + if (top.isEmpty()) { + U.warn(log, "Received empty topology for failover and is forced to fail."); + + // Nowhere to failover to. + return null; + } + + Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR); + + if (failedNodes == null) + failedNodes = U.newHashSet(1); + + Integer failoverCnt = failedNodes.size(); + + if (failoverCnt >= maxFailoverAttempts) { + U.warn(log, "Job failover failed because number of maximum failover attempts is exceeded [failedJob=" + + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" + maxFailoverAttempts + ']'); + + return null; + } + + failedNodes.add(ctx.getJobResult().getNode().id()); + + // Copy. + List<ClusterNode> newTop = new ArrayList<>(top.size()); + + for (ClusterNode node : top) + if (!failedNodes.contains(node.id())) + newTop.add(node); + + if (newTop.isEmpty()) { + U.warn(log, "Received topology with only nodes that job had failed on (forced to fail) [failedNodes=" + + failedNodes + ']'); + + // Nowhere to failover to. + return null; + } + + try { + ClusterNode node = ctx.getBalancedNode(newTop); + + if (node == null) + U.warn(log, "Load balancer returned null node for topology: " + newTop); + else { + // Increment failover count. + ctx.getJobResult().getJobContext().setAttribute(FAILED_NODE_LIST_ATTR, failedNodes); + + totalFailoverJobs++; + } + + if (node != null) + U.warn(log, "Failed over job to a new node [newNode=" + node.id() + + ", oldNode=" + ctx.getJobResult().getNode().id() + + ", sesId=" + ctx.getTaskSession().getId() + + ", job=" + ctx.getJobResult().getJob() + + ", jobCtx=" + ctx.getJobResult().getJobContext() + + ", task=" + ctx.getTaskSession().getTaskName() + ']'); + + return node; + } + catch (GridException e) { + U.error(log, "Failed to get next balanced node for failover: " + ctx, e); + + return null; + } + } + + /** {@inheritDoc} */ + @Override protected List<String> getConsistentAttributeNames() { + return Collections.singletonList(createSpiAttributeName(MAX_FAILOVER_ATTEMPT_ATTR)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AlwaysFailoverSpi.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/AlwaysFailoverSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/AlwaysFailoverSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/AlwaysFailoverSpiMBean.java new file mode 100644 index 0000000..d91be08 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/AlwaysFailoverSpiMBean.java @@ -0,0 +1,36 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.failover.always; + +import org.apache.ignite.mbean.*; +import org.apache.ignite.spi.*; + +/** + * Management bean for {@link AlwaysFailoverSpi}. + */ +@IgniteMBeanDescription("MBean that provides access to always failover SPI configuration.") +public interface AlwaysFailoverSpiMBean extends IgniteSpiManagementMBean { + /** + * Gets maximum number of attempts to execute a failed job on another node. + * If not specified, {@link AlwaysFailoverSpi#DFLT_MAX_FAILOVER_ATTEMPTS} value will be used. + * + * @return Maximum number of attempts to execute a failed job on another node. + */ + @IgniteMBeanDescription("Maximum number of attempts to execute a failed job on another node.") + public int getMaximumFailoverAttempts(); + + /** + * Get total number of jobs that were failed over. + * + * @return Total number of failed over jobs. + */ + @IgniteMBeanDescription("Total number of jobs that were failed over.") + public int getTotalFailoverJobsCount(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/GridAlwaysFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/GridAlwaysFailoverSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/GridAlwaysFailoverSpi.java deleted file mode 100644 index 81975eb..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/GridAlwaysFailoverSpi.java +++ /dev/null @@ -1,238 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.failover.always; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.failover.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.util.*; - -/** - * Failover SPI that always reroutes a failed job to another node. - * Note, that at first an attempt will be made to reroute the failed job - * to a node that was not part of initial split for a better chance of - * success. If no such nodes are available, then an attempt will be made to - * reroute the failed job to the nodes in the initial split minus the node - * the job is failed on. If none of the above attempts succeeded, then the - * job will not be failed over and {@code null} will be returned. - * <p> - * <h1 class="header">Configuration</h1> - * This SPI is default failover SPI and does not have to be explicitly - * configured unless configuration parameters need to be changed. - * <h2 class="header">Mandatory</h2> - * This SPI has no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * This SPI has following optional configuration parameters: - * <ul> - * <li> - * Maximum failover attempts for a single job (see {@link #setMaximumFailoverAttempts(int)}). - * If maximum failover attempts is reached, then job will not be failed-over and, - * hence, will fail. - * </li> - * </ul> - * Here is a Java example how to configure grid with {@code GridAlwaysFailoverSpi} failover SPI. - * <pre name="code" class="java"> - * GridAlwaysFailoverSpi spi = new GridAlwaysFailoverSpi(); - * - * // Override maximum failover attempts. - * spi.setMaximumFailoverAttempts(5); - * - * GridConfiguration cfg = new GridConfiguration(); - * - * // Override default failover SPI. - * cfg.setFailoverSpiSpi(spi); - * - * // Starts grid. - * G.start(cfg); - * </pre> - * Here is an example of how to configure {@code GridAlwaysFailoverSpi} from Spring XML configuration file. - * <pre name="code" class="xml"> - * <property name="failoverSpi"> - * <bean class="org.gridgain.grid.spi.failover.always.GridAlwaysFailoverSpi"> - * <property name="maximumFailoverAttempts" value="5"/> - * </bean> - * </property> - * </pre> - * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> - * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - * @see GridFailoverSpi - */ -@IgniteSpiMultipleInstancesSupport(true) -@IgniteSpiConsistencyChecked(optional = true) -public class GridAlwaysFailoverSpi extends IgniteSpiAdapter implements GridFailoverSpi, GridAlwaysFailoverSpiMBean { - /** Maximum number of attempts to execute a failed job on another node (default is {@code 5}). */ - public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 5; - - /** - * Name of job context attribute containing all nodes a job failed on. - * - * @see org.apache.ignite.compute.ComputeJobContext - */ - public static final String FAILED_NODE_LIST_ATTR = "gg:failover:failednodelist"; - - /** Maximum attempts attribute key should be the same on all nodes. */ - public static final String MAX_FAILOVER_ATTEMPT_ATTR = "gg:failover:maxattempts"; - - /** Injected grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Maximum number of attempts to execute a failed job on another node. */ - private int maxFailoverAttempts = DFLT_MAX_FAILOVER_ATTEMPTS; - - /** Number of jobs that were failed over. */ - private int totalFailoverJobs; - - /** {@inheritDoc} */ - @Override public int getMaximumFailoverAttempts() { - return maxFailoverAttempts; - } - - /** - * Sets maximum number of attempts to execute a failed job on another node. - * If not specified, {@link #DFLT_MAX_FAILOVER_ATTEMPTS} value will be used. - * - * @param maxFailoverAttempts Maximum number of attempts to execute a failed job on another node. - */ - @IgniteSpiConfiguration(optional = true) - public void setMaximumFailoverAttempts(int maxFailoverAttempts) { - this.maxFailoverAttempts = maxFailoverAttempts; - } - - /** {@inheritDoc} */ - @Override public int getTotalFailoverJobsCount() { - return totalFailoverJobs; - } - - /** {@inheritDoc} */ - @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { - return F.<String, Object>asMap(createSpiAttributeName(MAX_FAILOVER_ATTEMPT_ATTR), maxFailoverAttempts); - } - - /** {@inheritDoc} */ - @Override public void spiStart(String gridName) throws IgniteSpiException { - // Start SPI start stopwatch. - startStopwatch(); - - assertParameter(maxFailoverAttempts >= 0, "maxFailoverAttempts >= 0"); - - if (log.isDebugEnabled()) - log.debug(configInfo("maximumFailoverAttempts", maxFailoverAttempts)); - - registerMBean(gridName, this, GridAlwaysFailoverSpiMBean.class); - - // Ack ok start. - if (log.isDebugEnabled()) - log.debug(startInfo()); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - unregisterMBean(); - - // Ack ok stop. - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public ClusterNode failover(GridFailoverContext ctx, List<ClusterNode> top) { - assert ctx != null; - assert top != null; - - if (log.isDebugEnabled()) - log.debug("Received failed job result: " + ctx.getJobResult()); - - if (top.isEmpty()) { - U.warn(log, "Received empty topology for failover and is forced to fail."); - - // Nowhere to failover to. - return null; - } - - Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR); - - if (failedNodes == null) - failedNodes = U.newHashSet(1); - - Integer failoverCnt = failedNodes.size(); - - if (failoverCnt >= maxFailoverAttempts) { - U.warn(log, "Job failover failed because number of maximum failover attempts is exceeded [failedJob=" + - ctx.getJobResult().getJob() + ", maxFailoverAttempts=" + maxFailoverAttempts + ']'); - - return null; - } - - failedNodes.add(ctx.getJobResult().getNode().id()); - - // Copy. - List<ClusterNode> newTop = new ArrayList<>(top.size()); - - for (ClusterNode node : top) - if (!failedNodes.contains(node.id())) - newTop.add(node); - - if (newTop.isEmpty()) { - U.warn(log, "Received topology with only nodes that job had failed on (forced to fail) [failedNodes=" + - failedNodes + ']'); - - // Nowhere to failover to. - return null; - } - - try { - ClusterNode node = ctx.getBalancedNode(newTop); - - if (node == null) - U.warn(log, "Load balancer returned null node for topology: " + newTop); - else { - // Increment failover count. - ctx.getJobResult().getJobContext().setAttribute(FAILED_NODE_LIST_ATTR, failedNodes); - - totalFailoverJobs++; - } - - if (node != null) - U.warn(log, "Failed over job to a new node [newNode=" + node.id() + - ", oldNode=" + ctx.getJobResult().getNode().id() + - ", sesId=" + ctx.getTaskSession().getId() + - ", job=" + ctx.getJobResult().getJob() + - ", jobCtx=" + ctx.getJobResult().getJobContext() + - ", task=" + ctx.getTaskSession().getTaskName() + ']'); - - return node; - } - catch (GridException e) { - U.error(log, "Failed to get next balanced node for failover: " + ctx, e); - - return null; - } - } - - /** {@inheritDoc} */ - @Override protected List<String> getConsistentAttributeNames() { - return Collections.singletonList(createSpiAttributeName(MAX_FAILOVER_ATTEMPT_ATTR)); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridAlwaysFailoverSpi.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50d1554c/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/GridAlwaysFailoverSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/GridAlwaysFailoverSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/GridAlwaysFailoverSpiMBean.java deleted file mode 100644 index 74aeec2..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/failover/always/GridAlwaysFailoverSpiMBean.java +++ /dev/null @@ -1,36 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.failover.always; - -import org.apache.ignite.mbean.*; -import org.apache.ignite.spi.*; - -/** - * Management bean for {@link GridAlwaysFailoverSpi}. - */ -@IgniteMBeanDescription("MBean that provides access to always failover SPI configuration.") -public interface GridAlwaysFailoverSpiMBean extends IgniteSpiManagementMBean { - /** - * Gets maximum number of attempts to execute a failed job on another node. - * If not specified, {@link GridAlwaysFailoverSpi#DFLT_MAX_FAILOVER_ATTEMPTS} value will be used. - * - * @return Maximum number of attempts to execute a failed job on another node. - */ - @IgniteMBeanDescription("Maximum number of attempts to execute a failed job on another node.") - public int getMaximumFailoverAttempts(); - - /** - * Get total number of jobs that were failed over. - * - * @return Total number of failed over jobs. - */ - @IgniteMBeanDescription("Total number of jobs that were failed over.") - public int getTotalFailoverJobsCount(); -}