Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-496 c4f6e7563 -> 227220ae6


# IGNITE-368 WIP. Refactored suppressed errors regestry. Created Visor task. 
Minor code cleanup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a366928f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a366928f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a366928f

Branch: refs/heads/ignite-496
Commit: a366928f84be9b04393c15fbaf1f5e2fd3ca1dcb
Parents: 47539d8
Author: AKuznetsov <akuznet...@gridgain.com>
Authored: Fri Feb 27 16:09:14 2015 +0700
Committer: AKuznetsov <akuznet...@gridgain.com>
Committed: Fri Feb 27 16:09:14 2015 +0700

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgniteEx.java    |  7 ++
 .../internal/util/IgniteExceptionRegistry.java  | 64 ++++++++-----
 .../apache/ignite/internal/visor/VisorJob.java  |  3 +
 .../internal/visor/VisorMultiNodeTask.java      |  1 +
 .../compute/VisorComputeMonitoringHolder.java   | 10 +--
 .../visor/node/VisorNodeDataCollectorJob.java   |  3 +-
 .../visor/node/VisorNodeLastErrorsTask.java     | 94 ++++++++++++++++++++
 .../internal/visor/query/VisorQueryTask.java    |  3 +-
 .../util/IgniteExceptionRegistrySelfTest.java   | 12 ++-
 9 files changed, 161 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
index 3c35a08..7bb958d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
@@ -142,4 +142,11 @@ public interface IgniteEx extends Ignite {
      * @return Local grid node.
      */
     public ClusterNode localNode();
+
+    /**
+     * Internal context.
+     *
+     * @return Kernal context.
+     */
+    public GridKernalContext context();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java
index ab1d9bf..8a88fc4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -56,15 +57,16 @@ public class IgniteExceptionRegistry {
      */
     public IgniteExceptionRegistry(IgniteLogger log) {
         this.log = log;
-        this.queue = new ConcurrentLinkedDeque<>();
+
+        queue = new ConcurrentLinkedDeque<>();
     }
 
     /**
      * Default constructor.
      */
     protected IgniteExceptionRegistry() {
-        this.log = null;
-        this.queue = null;
+        log = null;
+        queue = null;
     }
 
     /**
@@ -74,26 +76,31 @@ public class IgniteExceptionRegistry {
      * @param e Exception.
      */
     public void onException(String msg, Throwable e) {
-        errorCnt.incrementAndGet();
+        long order = errorCnt.incrementAndGet();
 
         // Remove extra entity.
         while (queue.size() >= maxSize)
             queue.pollLast();
 
-        queue.offerFirst(new ExceptionInfo(e, msg, 
Thread.currentThread().getId(),
+        queue.offerFirst(new ExceptionInfo(order, e, msg, 
Thread.currentThread().getId(),
             Thread.currentThread().getName(), U.currentTimeMillis()));
     }
 
     /**
-     * Gets exceptions.
+     * Gets suppressed errors.
      *
-     * @return Exceptions.
+     * @param order Order number to filter errors.
+     * @return List of exceptions that happened after specified order.
      */
-    Collection<ExceptionInfo> getErrors() {
+    public List<ExceptionInfo> getErrors(long order) {
         List<ExceptionInfo> errors = new ArrayList<>();
 
-        for (ExceptionInfo entry : queue)
-            errors.add(entry);
+        for (ExceptionInfo error : queue) {
+            if (error.order <= order)
+                break;
+
+            errors.add(error);
+        }
 
         return errors;
     }
@@ -144,9 +151,16 @@ public class IgniteExceptionRegistry {
     }
 
     /**
-     *
+     * Detailed info about suppressed error.
      */
-    static class ExceptionInfo {
+    @SuppressWarnings("PublicInnerClass")
+    public static class ExceptionInfo implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final long order;
+
         /** */
         @GridToStringExclude
         private final Throwable error;
@@ -166,13 +180,16 @@ public class IgniteExceptionRegistry {
         /**
          * Constructor.
          *
-         * @param exception Exception.
-         * @param threadId Thread id.
+         * @param order Locally unique ID that is atomically incremented for 
each new error.
+         * @param error Suppressed error.
+         * @param msg Message that describe reason why error was suppressed.
+         * @param threadId Thread ID.
          * @param threadName Thread name.
          * @param time Occurrence time.
          */
-        public ExceptionInfo(Throwable exception, String msg, long threadId, 
String threadName, long time) {
-            this.error = exception;
+        public ExceptionInfo(long order, Throwable error, String msg, long 
threadId, String threadName, long time) {
+            this.order = order;
+            this.error = error;
             this.threadId = threadId;
             this.threadName = threadName;
             this.time = time;
@@ -180,21 +197,28 @@ public class IgniteExceptionRegistry {
         }
 
         /**
-         * @return Gets message.
+         * Locally unique ID that is atomically incremented for each new error.
+         */
+        public long order() {
+            return order;
+        }
+
+        /**
+         * @return Gets message that describe reason why error was suppressed.
          */
         public String message() {
             return msg;
         }
 
         /**
-         * @return Exception.
+         * @return Suppressed error.
          */
         public Throwable error() {
             return error;
         }
 
         /**
-         * @return Gets thread id.
+         * @return Gets thread ID.
          */
         public long threadId() {
             return threadId;
@@ -237,7 +261,7 @@ public class IgniteExceptionRegistry {
         }
 
         /** {@inheritDoc} */
-        @Override Collection<ExceptionInfo> getErrors() {
+        @Override public List<ExceptionInfo> getErrors(long order) {
             return Collections.emptyList();
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java
index c8a99f7..34414b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorJob.java
@@ -28,6 +28,9 @@ import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
 
 /**
  * Base class for Visor jobs.
+ *
+ * @param <A> Job argument type.
+ * @param <R> Job result type.
  */
 public abstract class VisorJob<A, R> extends ComputeJobAdapter {
     /** Auto-injected grid instance. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
index b92cfe2..7295760 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
@@ -34,6 +34,7 @@ import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
  *
  * @param <A> Task argument type.
  * @param <R> Task result type.
+ * @param <J> Job result type
  */
 public abstract class VisorMultiNodeTask<A, R, J> implements 
ComputeTask<VisorTaskArgument<A>, R> {
     /** Auto-injected grid instance. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java
index 2036d70..5d92df5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeMonitoringHolder.java
@@ -95,17 +95,17 @@ public class VisorComputeMonitoringHolder {
     /**
      * Schedule cleanup process for events monitoring.
      *
-     * @param g grid.
+     * @param ignite grid.
      */
-    private void scheduleCleanupJob(final IgniteEx g) {
-        ((IgniteKernal)g).context().timeout().addTimeoutObject(new 
GridTimeoutObjectAdapter(CLEANUP_TIMEOUT) {
+    private void scheduleCleanupJob(final IgniteEx ignite) {
+        ignite.context().timeout().addTimeoutObject(new 
GridTimeoutObjectAdapter(CLEANUP_TIMEOUT) {
             @Override public void onTimeout() {
                 synchronized (listenVisor) {
-                    if (tryDisableEvents(g)) {
+                    if (tryDisableEvents(ignite)) {
                         for (String visorKey : listenVisor.keySet())
                             listenVisor.put(visorKey, false);
 
-                        scheduleCleanupJob(g);
+                        scheduleCleanupJob(ignite);
                     }
                     else
                         cleanupStopped = true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index 43a987c..c9c68fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.visor.node;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.ipc.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -149,7 +148,7 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
      */
     protected void igfs(VisorNodeDataCollectorJobResult res) {
         try {
-            IgfsProcessorAdapter igfsProc = 
((IgniteKernal)ignite).context().igfs();
+            IgfsProcessorAdapter igfsProc = ignite.context().igfs();
 
             for (IgniteFs igfs : igfsProc.igfss()) {
                 long start0 = U.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeLastErrorsTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeLastErrorsTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeLastErrorsTask.java
new file mode 100644
index 0000000..980c6a3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeLastErrorsTask.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.node;
+
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.visor.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Task to collect last errors on nodes.
+ */
+@GridInternal
+public class VisorNodeLastErrorsTask extends VisorMultiNodeTask<Map<UUID, 
Long>,
+    Map<UUID, IgniteBiTuple<Long, 
List<IgniteExceptionRegistry.ExceptionInfo>>>,
+    IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorNodeLastErrorsJob job(Map<UUID, Long> arg) {
+        return new VisorNodeLastErrorsJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected Map<UUID, IgniteBiTuple<Long, 
List<IgniteExceptionRegistry.ExceptionInfo>>>
+        reduce0(List<ComputeJobResult> results) {
+        Map<UUID, IgniteBiTuple<Long, 
List<IgniteExceptionRegistry.ExceptionInfo>>> taskRes =
+            new HashMap<>(results.size());
+
+        for (ComputeJobResult res : results) {
+            IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>> 
jobRes = res.getData();
+
+            taskRes.put(res.getNode().id(), jobRes);
+        }
+
+        return taskRes;
+    }
+
+    /**
+     * Job to collect last errors on nodes.
+     */
+    private static class VisorNodeLastErrorsJob extends VisorJob<Map<UUID, 
Long>,
+        IgniteBiTuple<Long, List<IgniteExceptionRegistry.ExceptionInfo>>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Create job with given argument.
+         *
+         * @param arg Map with last error counter.
+         * @param debug Debug flag.
+         */
+        private VisorNodeLastErrorsJob(Map<UUID, Long> arg, boolean debug) {
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected IgniteBiTuple<Long, 
List<IgniteExceptionRegistry.ExceptionInfo>> run(Map<UUID, Long> arg) {
+            Long no = arg.get(ignite.localNode().id());
+
+            long order = no != null ? no : 0;
+
+            List<IgniteExceptionRegistry.ExceptionInfo> errors = 
ignite.context().exceptionRegistry().getErrors(order);
+
+            return new IgniteBiTuple<>(errors.isEmpty() ? no : 
errors.get(0).order(), errors);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(VisorNodeLastErrorsJob.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
index 0068b8c..a0bd346 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.visor.query;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
-import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.*;
@@ -280,7 +279,7 @@ public class VisorQueryTask extends 
VisorOneNodeTask<VisorQueryTask.VisorQueryAr
          * @param id Unique query result id.
          */
         private void scheduleResultSetHolderRemoval(final String id) {
-            ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new 
GridTimeoutObjectAdapter(RMV_DELAY) {
+            ignite.context().timeout().addTimeoutObject(new 
GridTimeoutObjectAdapter(RMV_DELAY) {
                 @Override public void onTimeout() {
                     ConcurrentMap<String, VisorFutureResultSetHolder> storage 
= ignite.cluster().nodeLocalMap();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a366928f/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java
index 5fa443d..34320b2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java
@@ -47,15 +47,13 @@ public class IgniteExceptionRegistrySelfTest extends 
GridCommonAbstractTest {
         for (int i = 0; i < expCnt; i++)
             registry.onException("Test " + i, new Exception("Test " + i));
 
-        Collection<IgniteExceptionRegistry.ExceptionInfo> exceptions = 
registry.getErrors();
+        Collection<IgniteExceptionRegistry.ExceptionInfo> errors = 
registry.getErrors(0);
 
-        assertEquals(expCnt, registry.getErrors().size());
-        assertEquals(expCnt, registry.getErrors().size());
-        assertEquals(expCnt, registry.getErrors().size());
+        assertEquals(expCnt, errors.size());
 
         int i = expCnt - 1;
 
-        for (IgniteExceptionRegistry.ExceptionInfo e : exceptions) {
+        for (IgniteExceptionRegistry.ExceptionInfo e : errors) {
             assertNotNull(e);
             assertEquals(e.message(), "Test " + i);
             assertEquals(e.threadId(), Thread.currentThread().getId());
@@ -82,8 +80,8 @@ public class IgniteExceptionRegistrySelfTest extends 
GridCommonAbstractTest {
             }
         }, 10, "TestSetMaxSize");
 
-        int size = registry.getErrors().size();
+        int size = registry.getErrors(0).size();
 
         assert maxSize + 1 >= size && maxSize - 1 <= size;
     }
-}
\ No newline at end of file
+}

Reply via email to