Repository: incubator-ignite Updated Branches: refs/heads/ignite-496 c4f6e7563 -> 227220ae6
# IGNITE-368 WIP. Refactored suppressed errors regestry. Created Visor task. Minor code cleanup. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a366928f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a366928f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a366928f Branch: refs/heads/ignite-496 Commit: a366928f84be9b04393c15fbaf1f5e2fd3ca1dcb Parents: 47539d8 Author: AKuznetsov <akuznet...@gridgain.com> Authored: Fri Feb 27 16:09:14 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Fri Feb 27 16:09:14 2015 +0700 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgniteEx.java | 7 ++ .../internal/util/IgniteExceptionRegistry.java | 64 ++++++++----- .../apache/ignite/internal/visor/VisorJob.java | 3 + .../internal/visor/VisorMultiNodeTask.java | 1 + .../compute/VisorComputeMonitoringHolder.java | 10 +-- .../visor/node/VisorNodeDataCollectorJob.java | 3 +- .../visor/node/VisorNodeLastErrorsTask.java | 94 ++++++++++++++++++++ .../internal/visor/query/VisorQueryTask.java | 3 +- .../util/IgniteExceptionRegistrySelfTest.java | 12 ++- 9 files changed, 161 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java index 3c35a08..7bb958d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java @@ -142,4 +142,11 @@ public interface IgniteEx extends Ignite { * @return Local grid node. */ public ClusterNode localNode(); + + /** + * Internal context. + * + * @return Kernal context. + */ + public GridKernalContext context(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java index ab1d9bf..8a88fc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -56,15 +57,16 @@ public class IgniteExceptionRegistry { */ public IgniteExceptionRegistry(IgniteLogger log) { this.log = log; - this.queue = new ConcurrentLinkedDeque<>(); + + queue = new ConcurrentLinkedDeque<>(); } /** * Default constructor. */ protected IgniteExceptionRegistry() { - this.log = null; - this.queue = null; + log = null; + queue = null; } /** @@ -74,26 +76,31 @@ public class IgniteExceptionRegistry { * @param e Exception. */ public void onException(String msg, Throwable e) { - errorCnt.incrementAndGet(); + long order = errorCnt.incrementAndGet(); // Remove extra entity. while (queue.size() >= maxSize) queue.pollLast(); - queue.offerFirst(new ExceptionInfo(e, msg, Thread.currentThread().getId(), + queue.offerFirst(new ExceptionInfo(order, e, msg, Thread.currentThread().getId(), Thread.currentThread().getName(), U.currentTimeMillis())); } /** - * Gets exceptions. + * Gets suppressed errors. * - * @return Exceptions. + * @param order Order number to filter errors. + * @return List of exceptions that happened after specified order. */ - Collection<ExceptionInfo> getErrors() { + public List<ExceptionInfo> getErrors(long order) { List<ExceptionInfo> errors = new ArrayList<>(); - for (ExceptionInfo entry : queue) - errors.add(entry); + for (ExceptionInfo error : queue) { + if (error.order <= order) + break; + + errors.add(error); + } return errors; } @@ -144,9 +151,16 @@ public class IgniteExceptionRegistry { } /** - * + * Detailed info about suppressed error. */ - static class ExceptionInfo { + @SuppressWarnings("PublicInnerClass") + public static class ExceptionInfo implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final long order; + /** */ @GridToStringExclude private final Throwable error; @@ -166,13 +180,16 @@ public class IgniteExceptionRegistry { /** * Constructor. * - * @param exception Exception. - * @param threadId Thread id. + * @param order Locally unique ID that is atomically incremented for each new error. + * @param error Suppressed error. + * @param msg Message that describe reason why error was suppressed. + * @param threadId Thread ID. * @param threadName Thread name. * @param time Occurrence time. */ - public ExceptionInfo(Throwable exception, String msg, long threadId, String threadName, long time) { - this.error = exception; + public ExceptionInfo(long order, Throwable error, String msg, long threadId, String threadName, long time) { + this.order = order; + this.error = error; this.threadId = threadId; this.threadName = threadName; this.time = time; @@ -180,21 +197,28 @@ public class IgniteExceptionRegistry { } /** - * @return Gets message. + * Locally unique ID that is atomically incremented for each new error. + */ + public long order() { + return order; + } + + /** + * @return Gets message that describe reason why error was suppressed. */ public String message() { return msg; } /** - * @return Exception. + * @return Suppressed error. */ public Throwable error() { return error; } /** - * @return Gets thread id. + * @return Gets thread ID. */ public long threadId() { return threadId; @@ -237,7 +261,7 @@ public class IgniteExceptionRegistry { } /** {@inheritDoc} */ - @Override Collection<ExceptionInfo> getErrors() { + @Override public List<ExceptionInfo> getErrors(long order) { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java index c8a99f7..34414b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java @@ -28,6 +28,9 @@ import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*; /** * Base class for Visor jobs. + * + * @param <A> Job argument type. + * @param <R> Job result type. */ public abstract class VisorJob<A, R> extends ComputeJobAdapter { /** Auto-injected grid instance. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java index b92cfe2..7295760 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java @@ -34,6 +34,7 @@ import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*; * * @param <A> Task argument type. * @param <R> Task result type. + * @param <J> Job result type */ public abstract class VisorMultiNodeTask<A, R, J> implements ComputeTask<VisorTaskArgument<A>, R> { /** Auto-injected grid instance. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java index 2036d70..5d92df5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java @@ -95,17 +95,17 @@ public class VisorComputeMonitoringHolder { /** * Schedule cleanup process for events monitoring. * - * @param g grid. + * @param ignite grid. */ - private void scheduleCleanupJob(final IgniteEx g) { - ((IgniteKernal)g).context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(CLEANUP_TIMEOUT) { + private void scheduleCleanupJob(final IgniteEx ignite) { + ignite.context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(CLEANUP_TIMEOUT) { @Override public void onTimeout() { synchronized (listenVisor) { - if (tryDisableEvents(g)) { + if (tryDisableEvents(ignite)) { for (String visorKey : listenVisor.keySet()) listenVisor.put(visorKey, false); - scheduleCleanupJob(g); + scheduleCleanupJob(ignite); } else cleanupStopped = true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index 43a987c..c9c68fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.visor.node; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.ipc.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -149,7 +148,7 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa */ protected void igfs(VisorNodeDataCollectorJobResult res) { try { - IgfsProcessorAdapter igfsProc = ((IgniteKernal)ignite).context().igfs(); + IgfsProcessorAdapter igfsProc = ignite.context().igfs(); for (IgniteFs igfs : igfsProc.igfss()) { long start0 = U.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeLastErrorsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeLastErrorsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeLastErrorsTask.java new file mode 100644 index 0000000..980c6a3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeLastErrorsTask.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.node; + +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.visor.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Task to collect last errors on nodes. + */ +@GridInternal +public class VisorNodeLastErrorsTask extends VisorMultiNodeTask<Map<UUID, Long>, + Map<UUID, IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>>>, + IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>>> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorNodeLastErrorsJob job(Map<UUID, Long> arg) { + return new VisorNodeLastErrorsJob(arg, debug); + } + + /** {@inheritDoc} */ + @Nullable @Override protected Map<UUID, IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>>> + reduce0(List<ComputeJobResult> results) { + Map<UUID, IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>>> taskRes = + new HashMap<>(results.size()); + + for (ComputeJobResult res : results) { + IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>> jobRes = res.getData(); + + taskRes.put(res.getNode().id(), jobRes); + } + + return taskRes; + } + + /** + * Job to collect last errors on nodes. + */ + private static class VisorNodeLastErrorsJob extends VisorJob<Map<UUID, Long>, + IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>>> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Create job with given argument. + * + * @param arg Map with last error counter. + * @param debug Debug flag. + */ + private VisorNodeLastErrorsJob(Map<UUID, Long> arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>> run(Map<UUID, Long> arg) { + Long no = arg.get(ignite.localNode().id()); + + long order = no != null ? no : 0; + + List<IgniteExceptionRegistry.ExceptionInfo> errors = ignite.context().exceptionRegistry().getErrors(order); + + return new IgniteBiTuple<>(errors.isEmpty() ? no : errors.get(0).order(), errors); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorNodeLastErrorsJob.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java index 0068b8c..a0bd346 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.visor.query; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.*; @@ -280,7 +279,7 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryTask.VisorQueryAr * @param id Unique query result id. */ private void scheduleResultSetHolderRemoval(final String id) { - ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(RMV_DELAY) { + ignite.context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(RMV_DELAY) { @Override public void onTimeout() { ConcurrentMap<String, VisorFutureResultSetHolder> storage = ignite.cluster().nodeLocalMap(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java index 5fa443d..34320b2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java @@ -47,15 +47,13 @@ public class IgniteExceptionRegistrySelfTest extends GridCommonAbstractTest { for (int i = 0; i < expCnt; i++) registry.onException("Test " + i, new Exception("Test " + i)); - Collection<IgniteExceptionRegistry.ExceptionInfo> exceptions = registry.getErrors(); + Collection<IgniteExceptionRegistry.ExceptionInfo> errors = registry.getErrors(0); - assertEquals(expCnt, registry.getErrors().size()); - assertEquals(expCnt, registry.getErrors().size()); - assertEquals(expCnt, registry.getErrors().size()); + assertEquals(expCnt, errors.size()); int i = expCnt - 1; - for (IgniteExceptionRegistry.ExceptionInfo e : exceptions) { + for (IgniteExceptionRegistry.ExceptionInfo e : errors) { assertNotNull(e); assertEquals(e.message(), "Test " + i); assertEquals(e.threadId(), Thread.currentThread().getId()); @@ -82,8 +80,8 @@ public class IgniteExceptionRegistrySelfTest extends GridCommonAbstractTest { } }, 10, "TestSetMaxSize"); - int size = registry.getErrors().size(); + int size = registry.getErrors(0).size(); assert maxSize + 1 >= size && maxSize - 1 <= size; } -} \ No newline at end of file +}