IGNITE-306 Added exception registry.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b0c8d7ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b0c8d7ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b0c8d7ab Branch: refs/heads/ignite-sql-tests Commit: b0c8d7aba03b904a8882d78cadbe12466fbf6e3b Parents: e86c69e Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Thu Feb 19 11:16:12 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Tue Feb 24 13:24:01 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 5 + .../ignite/internal/GridKernalContext.java | 8 + .../ignite/internal/GridKernalContextImpl.java | 11 + .../apache/ignite/internal/IgniteKernal.java | 6 + .../internal/managers/GridManagerAdapter.java | 5 + .../internal/util/IgniteExceptionRegistry.java | 222 +++++++++++++++++++ .../org/apache/ignite/mxbean/IgniteMXBean.java | 6 + .../org/apache/ignite/spi/IgniteSpiAdapter.java | 6 + .../org/apache/ignite/spi/IgniteSpiContext.java | 8 + .../communication/tcp/TcpCommunicationSpi.java | 53 ++++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 71 +++++- .../util/IgniteExceptionRegistrySelfTest.java | 89 ++++++++ .../testframework/GridSpiTestContext.java | 6 + .../junits/GridTestKernalContext.java | 1 + 14 files changed, 488 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 7280d2d..547cbc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -339,6 +339,11 @@ public final class IgniteSystemProperties { public static final String IGNITE_MBEAN_APPEND_JVM_ID = "IGNITE_MBEAN_APPEND_JVM_ID"; /** + * Property controlling size of buffer holding last exception. Default value of {@code 1000}. + */ + public static final String IGNITE_EXCEPTION_REGISTRY_MAX_SIZE = "IGNITE_EXCEPTION_REGISTRY_MAX_SIZE"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 40421d1..100ad28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.session.*; import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.plugin.*; @@ -500,4 +501,11 @@ public interface GridKernalContext extends Iterable<GridComponent> { * messages. */ public ExecutorService getRestExecutorService(); + + /** + * Gets exception registry. + * + * @return Exception registry. + */ + public IgniteExceptionRegistry exceptionRegistry(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 6f31bf7..395ad52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.spring.*; import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -296,6 +297,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** Performance suggestions. */ private final GridPerformanceSuggestions perf = new GridPerformanceSuggestions(); + /** Exception registry. */ + private IgniteExceptionRegistry registry; + /** * No-arg constructor is required by externalization. */ @@ -324,6 +328,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable IgniteEx grid, IgniteConfiguration cfg, GridKernalGateway gw, + IgniteExceptionRegistry registry, ExecutorService utilityCachePool, ExecutorService execSvc, ExecutorService sysExecSvc, @@ -338,6 +343,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.grid = grid; this.cfg = cfg; this.gw = gw; + this.registry = registry; this.utilityCachePool = utilityCachePool; this.execSvc = execSvc; this.sysExecSvc = sysExecSvc; @@ -843,6 +849,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public IgniteExceptionRegistry exceptionRegistry() { + return registry; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index a5d19c0..8a21477 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -401,6 +401,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ + @Override public void printLastErrors() { + ctx.exceptionRegistry().printErrors(); + } + + /** {@inheritDoc} */ @Override public String getVmName() { return ManagementFactory.getRuntimeMXBean().getName(); } @@ -664,6 +669,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { this, cfg, gw, + new IgniteExceptionRegistry(log), utilityCachePool, execSvc, sysExecSvc, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 30ea854..35e8989 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -545,6 +546,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan return ctx.io().messageFactory(); } + @Override public IgniteExceptionRegistry exceptionRegistry() { + return ctx.exceptionRegistry(); + } + /** * @param e Exception to handle. * @return GridSpiException Converted exception. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java new file mode 100644 index 0000000..d2fca6e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.IgniteSystemProperties.*; + +/** + * Class collects errors from {@link TcpCommunicationSpi} and {@link TcpDiscoverySpi}. + */ +public class IgniteExceptionRegistry { + /** */ + public static final int DEFAULT_QUEUE_SIZE = 1000; + + /** */ + private int maxSize = IgniteSystemProperties.getInteger(IGNITE_EXCEPTION_REGISTRY_MAX_SIZE, DEFAULT_QUEUE_SIZE); + + /** */ + private AtomicLong errorCnt = new AtomicLong(); + + /** */ + private final ConcurrentLinkedDeque<IgniteExceptionInfo> queue = new ConcurrentLinkedDeque<>(); + + /** */ + private final IgniteLogger log; + + /** + * Constructor. + * + * @param log Ignite logger. + */ + public IgniteExceptionRegistry(IgniteLogger log) { + this.log = log; + } + + /** + * Puts exception into queue. + * Thread-safe. + * + * @param e Exception. + */ + public void onException(String msg, Throwable e) { + errorCnt.incrementAndGet(); + + // Remove extra entity. + while (queue.size() >= maxSize) + queue.pollLast(); + + queue.offerFirst(new IgniteExceptionInfo(e, msg, Thread.currentThread().getId(), + Thread.currentThread().getName(), U.currentTimeMillis())); + } + + /** + * Gets exceptions. + * + * @return Exceptions. + */ + Collection<IgniteExceptionInfo> getErrors() { + int size = queue.size(); + + List<IgniteExceptionInfo> errors = new ArrayList<>(size); + + int cnt = 0; + + for (IgniteExceptionInfo entry : queue) { + if (cnt < size) + errors.add(entry); + else + break; + + ++cnt; + } + + return errors; + } + + /** + * Sets max size. Default value {@link #DEFAULT_QUEUE_SIZE} + * + * @param maxSize Max size. + */ + public void setMaxSize(int maxSize) { + A.ensure(maxSize > 0, "Max queue size must be greater than 0."); + + this.maxSize = maxSize; + } + + /** + * Prints errors. + */ + public void printErrors() { + int size = queue.size(); + + int cnt = 0; + + Iterator<IgniteExceptionInfo> descIter = queue.descendingIterator(); + + while (descIter.hasNext() && cnt < size){ + IgniteExceptionInfo error = descIter.next(); + + log.error( + "Time of occurrence: " + new Date(error.time()) + "\n" + + "Error message: " + error.message() + "\n" + + "Thread id: " + error.threadId() + "\n" + + "Thread name: " + error.threadName(), + error.exception() + ); + + ++cnt; + } + } + + /** + * Errors count. + * + * @return Errors count. + */ + public long errorCount() { + return errorCnt.get(); + } + + /** + * + */ + static class IgniteExceptionInfo { + /** */ + @GridToStringExclude + private final Throwable exception; + + /** */ + private final long threadId; + + /** */ + private final String threadName; + + /** */ + private final long time; + + /** */ + private String msg; + + /** + * Constructor. + * + * @param exception Exception. + * @param threadId Thread id. + * @param threadName Thread name. + * @param time Occurrence time. + */ + public IgniteExceptionInfo(Throwable exception, String msg, long threadId, String threadName, long time) { + this.exception = exception; + this.threadId = threadId; + this.threadName = threadName; + this.time = time; + this.msg = msg; + } + + /** + * @return Gets message. + */ + public String message() { + return msg; + } + + /** + * @return Exception. + */ + public Throwable exception() { + return exception; + } + + /** + * @return Gets thread id. + */ + public long threadId() { + return threadId; + } + + /** + * @return Gets thread name. + */ + public String threadName() { + return threadName; + } + + /** + * @return Gets time. + */ + public long time() { + return time; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(IgniteExceptionInfo.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java index 15dfde5..7cf83fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java @@ -359,4 +359,10 @@ public interface IgniteMXBean { */ @MXBeanDescription("Optional kernal instance name.") public String getInstanceName(); + + /** + * Prints errors. + */ + @MXBeanDescription("Print errors.") + public void printLastErrors(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index eabec8d..7e2fb9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -23,6 +23,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -712,5 +713,10 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement @Override public MessageFactory messageFactory() { return null; } + + /** {@inheritDoc} */ + @Override public IgniteExceptionRegistry exceptionRegistry() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 5a0a23f..02dac66 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.spi.swapspace.*; @@ -389,4 +390,11 @@ public interface IgniteSpiContext { * @return Message factory. */ public MessageFactory messageFactory(); + + /** + * Gets exception registry. + * + * @return Exception registry. + */ + public IgniteExceptionRegistry exceptionRegistry(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index ad9f688..1f15465 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1552,6 +1552,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + locHost + ']'); + + onException("Failed to bind to local port (will try next port within range) [port=" + port + + ", locHost=" + locHost + ']', e); } } @@ -1601,6 +1604,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + locHost + ']'); + + onException("Failed to bind to local port (will try next port within range) [port=" + port + + ", locHost=" + locHost + ']', e); } } @@ -1868,9 +1874,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG); else if (getSpiContext().node(node.id()) != null) LT.warn(log, null, e.getMessage()); - else if (log.isDebugEnabled()) - log.debug("Failed to establish shared memory connection with local node (node has left): " + - node.id()); + else { + if (log.isDebugEnabled()) + log.debug("Failed to establish shared memory connection with local node (node has left): " + + node.id()); + + onException("Failed to establish shared memory connection with local node (node has left): " + + node.id(), e); + } } } @@ -1912,6 +1923,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter safeHandshake(client, null, node.id(), connTimeout0); } catch (HandshakeTimeoutException e) { + onException("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + ", client=" + client + ']', e); + if (log.isDebugEnabled()) log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + ", err=" + e.getMessage() + ", client=" + client + ']'); @@ -2067,6 +2081,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter client = null; } + onException("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + ", addr=" + addr + ']', e); + if (log.isDebugEnabled()) log.debug( "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + @@ -2104,6 +2121,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter client = null; } + onException("Client creation failed. Addr=" + addr + '.', e); + if (log.isDebugEnabled()) log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']'); @@ -2329,6 +2348,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return recovery; } + /** + * @param msg Error message. + * @param e Exception. + */ + private void onException(String msg, Exception e){ + if (super.getSpiContext().exceptionRegistry() != null) + super.getSpiContext().exceptionRegistry().onException(msg, e); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpCommunicationSpi.class, this); @@ -2645,8 +2673,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter catch (IOException e) { if (getSpiContext().pingNode(entry.getKey())) U.error(log, "Failed to flush client: " + client, e); - else if (log.isDebugEnabled()) - log.debug("Failed to flush client (node left): " + client); + else { + if (log.isDebugEnabled()) + log.debug("Failed to flush client (node left): " + client); + + onException("Failed to flush client (node left): " + client, e); + } } finally { if (err) @@ -2805,9 +2837,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter addReconnectRequest(recoveryDesc); } - else if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, " + - "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + else { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, " + + "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + + onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", + e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 7501a66..11167e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1080,7 +1080,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return true; } catch (UnknownHostException ignored) { - // No-op. + onException(ignored.getMessage(), ignored); } } @@ -1144,6 +1144,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); + onException("Failed to ping node=" + node, e); // continue; } } @@ -1451,6 +1452,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov log.debug("Failed to send join request message [addr=" + addr + ", msg=" + ioe != null ? ioe.getMessage() : e.getMessage() + ']'); + + onException("Failed to send join request message addr=" + addr, ioe); } noResAddrs.add(addr); @@ -1580,6 +1583,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) U.error(log, "Class cast exception on direct send: " + addr, e); + onException("Class cast exception on direct send.", e); + if (errs == null) errs = new ArrayList<>(); @@ -1589,6 +1594,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.error("Exception on direct send: " + e.getMessage(), e); + onException("Exception on direct send.", e); + if (errs == null) errs = new ArrayList<>(); @@ -1747,6 +1754,15 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** + * @param msg Error message. + * @param e Exception. + */ + private void onException(String msg, Exception e){ + if (super.getSpiContext().exceptionRegistry() != null) + super.getSpiContext().exceptionRegistry().onException(msg, e); + } + + /** * @param node Node. * @return {@link LinkedHashSet} of internal and external addresses of provided node. * Internal addresses placed before external addresses. @@ -2734,6 +2750,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to connect to next node [msg=" + msg + ", err=" + e + ']'); + onException("Failed to connect to next node msg=" + msg, e); + if (!openSock) break; // Don't retry if we can not establish connection. @@ -2856,6 +2874,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov U.error(log, "Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ", err=" + e + ']', e); + onException("Failed to send message to next node [next=" + next.id() + ", msg=" + + msg + ']', e); + if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { ackTimeout0 *= 2; @@ -3018,6 +3039,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to send loopback problem message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to send loopback problem message to node=" + node, e); } // Ignore join request. @@ -3048,6 +3071,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov log.debug("Failed to send duplicate ID message to node " + "[node=" + node + ", existingNode=" + existingNode + ", err=" + e.getMessage() + ']'); + + onException("Failed to send duplicate ID message to node " + + "[node=" + node + ", existingNode=" + existingNode + ']', e); } // Output warning. @@ -3091,6 +3117,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to send unauthenticated message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to send unauthenticated message to node=" + node, e); } // Ignore join request. @@ -3141,6 +3169,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov log.debug("Failed to authenticate node (will ignore join request) [node=" + node + ", err=" + e + ']'); + onException("Failed to authenticate node (will ignore join request). Node=" + node, e); + // Ignore join request. return; } @@ -3175,6 +3205,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to send hash ID resolver validation failed message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to send hash ID resolver validation failed message to node=" + node, e); } // Ignore join request. @@ -3218,6 +3250,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to send version check failed message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to send version check failed message to node=" + node, e); } // Ignore join request. @@ -3272,6 +3306,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to send version check failed message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to send version check failed message to node=" + node, e); } // Ignore join request. @@ -3313,6 +3349,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to send marshaller check failed message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to send marshaller check failed message to node=" + node, e); } // Ignore join request. @@ -3550,6 +3588,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to send unauthenticated message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to send unauthenticated message to node=" + node, e); } addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, node.id(), @@ -3750,6 +3790,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to register new node address [node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to register new node address " + node, e); } } @@ -3881,6 +3923,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov catch (IgniteSpiException ignored) { if (log.isDebugEnabled()) log.debug("Failed to unregister left node address: " + leftNode); + + onException("Failed to unregister left node address: " + leftNode, ignored); } } @@ -3921,6 +3965,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to send verified node left message to leaving node [msg=" + msg + ", err=" + e.getMessage() + ']'); + + onException("Failed to send verified node left message to leaving node, msg=" + msg, e); } finally { forceSndPending = true; @@ -4056,6 +4102,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to unregister failed node address [node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to unregister failed node address, node=" + node, e); } } @@ -4158,6 +4206,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov log.debug("Failed to respond to status check message (connection refused) " + "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']'); } + + onException("Failed to respond to status check message (connection refused) " + + "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e); } else { if (pingNode(msg.creatorNode())) { @@ -4408,6 +4459,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Failed to bind to local port (will try next port within range) " + "[port=" + port + ", localHost=" + locHost + ']'); + + onException("Failed to bind to local port. " + + "[port=" + port + ", localHost=" + locHost + ']', e); } else { throw new IgniteSpiException("Failed to bind TCP server socket (possibly all ports in range " + @@ -4447,6 +4501,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) U.error(log, "Failed to accept TCP connection.", e); + onException("Failed to accept TCP connection.", e); + if (!isInterrupted()) { if (U.isMacInvalidArgumentError(e)) U.error(log, "Failed to accept TCP connection\n\t" + U.MAC_INVALID_ARG_MSG, e); @@ -4624,6 +4680,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov LT.error(log, e, "Failed to initialize connection [sock=" + sock + ']'); } + onException("Caught exception on handshake [err=" + e + ", sock=" + sock + ']', e); + return; } catch (IgniteCheckedException e) { @@ -4646,6 +4704,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov (!sock.isClosed() && !e.hasCause(IOException.class))) LT.error(log, e, "Failed to initialize connection [sock=" + sock + ']'); + onException("Caught exception on handshake. Sock=" + sock, e); + return; } @@ -4857,6 +4917,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']'); + onException("Caught exception on message read [sock=" + sock + + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e); + return; } catch (IOException e) { @@ -4875,6 +4938,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov LT.error(log, e, "Failed to send receipt on message [sock=" + sock + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']'); + onException("Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId + + ", rmtNodeId=" + nodeId + ']', e); + return; } } @@ -5096,6 +5162,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov U.error(log, "Client connection failed [sock=" + sock + ", locNodeId=" + ignite.configuration().getNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']', e); + onException("Client connection failed [sock=" + sock + ", locNodeId=" + + ignite.configuration().getNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']', e); + U.interrupt(clientMsgWorkers.remove(nodeId)); U.closeQuiet(sock); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java new file mode 100644 index 0000000..14320f5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteExceptionRegistrySelfTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +@GridCommonTest(group = "Utils") +public class IgniteExceptionRegistrySelfTest extends GridCommonAbstractTest { + /** */ + private IgniteExceptionRegistry registry; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + registry = new IgniteExceptionRegistry(new GridStringLogger()); + } + + /** + * @throws Exception if failed. + */ + public void testOnException() throws Exception { + int expCnt = 150; + + for (int i = 0; i < expCnt; i++) + registry.onException("Test " + i, new Exception("Test " + i)); + + Collection<IgniteExceptionRegistry.IgniteExceptionInfo> exceptions = registry.getErrors(); + + assertEquals(expCnt, registry.getErrors().size()); + assertEquals(expCnt, registry.getErrors().size()); + assertEquals(expCnt, registry.getErrors().size()); + + int i = expCnt - 1; + + for (IgniteExceptionRegistry.IgniteExceptionInfo e : exceptions) { + assertNotNull(e); + assertEquals(e.message(), "Test " + i); + assertEquals(e.threadId(), Thread.currentThread().getId()); + assertEquals(e.threadName(), Thread.currentThread().getName()); + + --i; + } + } + + /** + * @throws Exception if failed. + */ + public void testMultiThreadedMaxSize() throws Exception { + final int maxSize = 10; + + registry.setMaxSize(maxSize); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < maxSize; i++) + registry.onException("Test " + i, new Exception("test")); + + return null; + } + }, 10, "TestSetMaxSize"); + + int size = registry.getErrors().size(); + + assert maxSize + 1 >= size && maxSize - 1 <= size; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index ad59f18..ff8cee4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.direct.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -529,6 +530,11 @@ public class GridSpiTestContext implements IgniteSpiContext { } /** {@inheritDoc} */ + @Override public IgniteExceptionRegistry exceptionRegistry() { + return null; + } + + /** {@inheritDoc} */ @Override public MessageFactory messageFactory() { if (factory == null) factory = new GridIoMessageFactory(null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0c8d7ab/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index d9c9d6d..05b6f80 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -44,6 +44,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, null, null, + null, null); GridTestUtils.setFieldValue(grid(), "cfg", config());