# IGNITE-883 Create and use GridTimerProcessor.schedule()
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/997d65e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/997d65e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/997d65e2 Branch: refs/heads/ignite-389 Commit: 997d65e2aeb5d1c7a62a95120876238b0de6d3d4 Parents: bfae889 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Mon Jun 1 18:10:28 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Mon Jun 1 19:50:46 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 28 +---- .../discovery/GridDiscoveryManager.java | 28 ++--- .../timeout/GridTimeoutProcessor.java | 107 ++++++++++++++++++- 3 files changed, 117 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/997d65e2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 8a7dc70..e3fc50f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -167,14 +167,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @GridToStringExclude private Timer updateNtfTimer; - /** */ - @GridToStringExclude - private Timer starveTimer; - - /** */ - @GridToStringExclude - private Timer metricsLogTimer; - /** Indicate error on grid stop. */ @GridToStringExclude private boolean errOnStop; @@ -867,13 +859,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (starveCheck) { final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr); - starveTimer = new Timer("ignite-starvation-checker"); - - starveTimer.scheduleAtFixedRate(new GridTimerTask() { + ctx.timeout().schedule(new Runnable() { /** Last completed task count. */ private long lastCompletedCnt; - @Override protected void safeRun() { + @Override public void run() { if (!(execSvc instanceof ThreadPoolExecutor)) return; @@ -896,13 +886,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { long metricsLogFreq = cfg.getMetricsLogFrequency(); if (metricsLogFreq > 0) { - metricsLogTimer = new Timer("ignite-metrics-logger"); - - metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() { - /** */ + ctx.timeout().schedule(new Runnable() { private final DecimalFormat dblFmt = new DecimalFormat("#.##"); - @Override protected void safeRun() { + @Override public void run() { if (log.isInfoEnabled()) { ClusterMetrics m = cluster().localNode().metrics(); @@ -1713,13 +1700,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (updateNtfTimer != null) updateNtfTimer.cancel(); - if (starveTimer != null) - starveTimer.cancel(); - - // Cancel metrics log timer. - if (metricsLogTimer != null) - metricsLogTimer.cancel(); - boolean interrupted = false; while (true) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/997d65e2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 4ef602e..9b8280e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.security.*; +import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; @@ -165,7 +166,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final GridLocalMetrics metrics = createMetrics(); /** Metrics update worker. */ - private final MetricsUpdater metricsUpdater = new MetricsUpdater(); + private GridTimeoutProcessor.CancelableTask metricsUpdateTask; /** Custom event listener. */ private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs = @@ -325,7 +326,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { checkSegmentOnStart(); } - new IgniteThread(metricsUpdater).start(); + metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ, METRICS_UPDATE_FREQ); spi.setMetricsProvider(createMetricsProvider()); @@ -987,11 +988,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { getSpi().setListener(null); // Stop discovery worker and metrics updater. + U.closeQuiet(metricsUpdateTask); + U.cancel(discoWrk); - U.cancel(metricsUpdater); U.join(discoWrk, log); - U.join(metricsUpdater, log); // Stop SPI itself. stopSpi(); @@ -1879,28 +1880,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * */ - private class MetricsUpdater extends GridWorker { + private class MetricsUpdater implements Runnable { /** */ private long prevGcTime = -1; /** */ private long prevCpuTime = -1; - /** - * - */ - private MetricsUpdater() { - super(ctx.gridName(), "metrics-updater", GridDiscoveryManager.this.log); - } - /** {@inheritDoc} */ - @Override protected void body() throws IgniteInterruptedCheckedException { - while (!isCancelled()) { - U.sleep(METRICS_UPDATE_FREQ); - - gcCpuLoad = getGcCpuLoad(); - cpuLoad = getCpuLoad(); - } + @Override public void run() { + gcCpuLoad = getGcCpuLoad(); + cpuLoad = getCpuLoad(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/997d65e2/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java index 81ff72b..e9b7717 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java @@ -24,8 +24,10 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.lang.*; import org.apache.ignite.thread.*; +import java.io.*; import java.util.*; /** @@ -40,10 +42,12 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { new GridConcurrentSkipListSet<>(new Comparator<GridTimeoutObject>() { /** {@inheritDoc} */ @Override public int compare(GridTimeoutObject o1, GridTimeoutObject o2) { - long time1 = o1.endTime(); - long time2 = o2.endTime(); + int res = Long.compare(o1.endTime(), o2.endTime()); - return time1 < time2 ? -1 : time1 > time2 ? 1 : o1.timeoutId().compareTo(o2.timeoutId()); + if (res != 0) + return res; + + return o1.timeoutId().compareTo(o2.timeoutId()); } }); @@ -98,6 +102,26 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { } /** + * Schedule the specified timer task for execution at the specified + * time with the specified period, in milliseconds. + * + * @param task Task to execute. + * @param delay Delay to first execution in milliseconds. + * @param period Period for execution in milliseconds or -1. + * @return Cancelable to cancel task. + */ + public CancelableTask schedule(Runnable task, long delay, long period) { + assert delay >= 0; + assert period > 0 || period == -1; + + CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period); + + addTimeoutObject(obj); + + return obj; + } + + /** * @param timeoutObj Timeout object. */ public void removeTimeoutObject(GridTimeoutObject timeoutObj) { @@ -173,4 +197,81 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { X.println(">>> Timeout processor memory stats [grid=" + ctx.gridName() + ']'); X.println(">>> timeoutObjsSize: " + timeoutObjs.size()); } + + /** + * + */ + public class CancelableTask implements GridTimeoutObject, Closeable { + /** */ + private final IgniteUuid id = new IgniteUuid(); + + /** */ + private long endTime; + + /** */ + private final long period; + + /** */ + private volatile boolean cancel; + + /** */ + private final Runnable task; + + /** + * @param firstTime First time. + * @param period Period. + * @param task Task to execute. + */ + CancelableTask(Runnable task, long firstTime, long period) { + this.task = task; + endTime = firstTime; + this.period = period; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return id; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public synchronized void onTimeout() { + if (cancel) + return; + + long startTime = U.currentTimeMillis(); + + try { + task.run(); + } + finally { + long executionTime = U.currentTimeMillis() - startTime; + + if (executionTime > 10) { + U.warn(log, "Timer task take a lot of time, tasks submitted to GridTimeoutProcessor must work " + + "quickly [executionTime=" + executionTime + ']'); + } + + if (!cancel && period > 0) { + endTime = U.currentTimeMillis() + period; + + addTimeoutObject(this); + } + } + } + + /** {@inheritDoc} */ + @Override public void close() { + cancel = true; + + synchronized (this) { + // Just waiting for task execution end to make sure that task will not be executed anymore. + removeTimeoutObject(this); + } + } + } }