ignite-312 review

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2bfb8932
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2bfb8932
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2bfb8932

Branch: refs/heads/ignite-264
Commit: 2bfb89322473b6579a59d114b45eb6e9532df4d6
Parents: 7423b7c
Author: Yakov Zhdanov <yzhda...@gridgain.com>
Authored: Wed Feb 25 22:07:59 2015 +0300
Committer: Yakov Zhdanov <yzhda...@gridgain.com>
Committed: Wed Feb 25 22:07:59 2015 +0300

----------------------------------------------------------------------
 .../ignite/events/DiscoveryCustomEvent.java     |  56 +++
 .../org/apache/ignite/events/EventType.java     |  12 +
 .../discovery/GridDiscoveryManager.java         |  78 ++--
 .../ignite/internal/util/IgniteUtils.java       | 364 ++-----------------
 .../ignite/spi/discovery/DiscoverySpi.java      |   7 +
 .../spi/discovery/DiscoverySpiListener.java     |  11 +-
 .../discovery/tcp/TcpClientDiscoverySpi.java    |   7 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  39 +-
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   |   3 +
 .../TcpDiscoveryCustomEventMessage.java         |  66 ++++
 .../internal/GridDiscoveryEventSelfTest.java    |  43 +++
 .../discovery/AbstractDiscoverySelfTest.java    |   7 +-
 12 files changed, 321 insertions(+), 372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java 
b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java
new file mode 100644
index 0000000..af15055
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ *
+ */
+public class DiscoveryCustomEvent extends DiscoveryEvent {
+    /** */
+    private Serializable data;
+
+    /**
+     * Default constructor.
+     */
+    public DiscoveryCustomEvent() {
+        type(EventType.EVT_DISCOVERY_CUSTOM_EVT);
+    }
+
+    /**
+     * @return Data.
+     */
+    public Serializable data() {
+        return data;
+    }
+
+    /**
+     * @param data New data.
+     */
+    public void data(Serializable data) {
+        this.data = data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DiscoveryCustomEvent.class, this, super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java 
b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 9f7e2c4..d9ffc2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.events;
 
+import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
+import java.io.*;
 import java.util.*;
 
 /**
@@ -158,6 +160,16 @@ public interface EventType {
     public static final int EVT_CLIENT_NODE_RECONNECTED = 17;
 
     /**
+     * Built-in event type: custom event sent.
+     * <br>
+     * Generated when someone invoke {@link 
GridDiscoveryManager#sendCustomEvent(Serializable)}.
+     * <p>
+     *
+     * @see DiscoveryCustomEvent
+     */
+    public static final int EVT_DISCOVERY_CUSTOM_EVT = 18;
+
+    /**
      * Built-in event type: task started.
      * <p>
      * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 711229a..b65da50 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -255,8 +255,14 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         }
 
         getSpi().setListener(new DiscoverySpiListener() {
-            @Override public void onDiscovery(int type, long topVer, 
ClusterNode node, Collection<ClusterNode> topSnapshot,
-                Map<Long, Collection<ClusterNode>> snapshots) {
+            @Override public void onDiscovery(
+                int type,
+                long topVer,
+                ClusterNode node,
+                Collection<ClusterNode> topSnapshot,
+                Map<Long, Collection<ClusterNode>> snapshots,
+                @Nullable Serializable data
+            ) {
                 final ClusterNode locNode = localNode();
 
                 if (snapshots != null)
@@ -270,7 +276,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 // Put topology snapshot into discovery history.
                 // There is no race possible between history maintenance and 
concurrent discovery
                 // event notifications, since SPI notifies manager about all 
events from this listener.
-                if (type != EVT_NODE_METRICS_UPDATED) {
+                if (type != EVT_NODE_METRICS_UPDATED && type != 
EVT_DISCOVERY_CUSTOM_EVT) {
                     DiscoCache cache = new DiscoCache(locNode, 
F.view(topSnapshot, F.remoteNodes(locNode.id())));
 
                     discoCacheHist.put(topVer, cache);
@@ -305,7 +311,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                         ", evt=" + U.gridEventName(type) + ']';
                 }
 
-                discoWrk.addEvent(type, topVer, node, topSnapshot);
+                discoWrk.addEvent(type, topVer, node, topSnapshot, data);
             }
         });
 
@@ -778,12 +784,11 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         // Stop receiving notifications.
         getSpi().setListener(null);
 
-        // Stop discovery worker.
+        // Stop discovery worker and metrics updater.
         U.cancel(discoWrk);
-        U.join(discoWrk, log);
-
-        // Stop metrics updater.
         U.cancel(metricsUpdater);
+
+        U.join(discoWrk, log);
         U.join(metricsUpdater, log);
 
         // Stop SPI itself.
@@ -1185,6 +1190,13 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         ).start();
     }
 
+    /**
+     * @param evt Event.
+     */
+    public void sendCustomEvent(Serializable evt) {
+        getSpi().sendCustomEvent(evt);
+    }
+
     /** Worker for network segment checks. */
     private class SegmentCheckWorker extends GridWorker {
         /** */
@@ -1241,7 +1253,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
                     if (!segValid) {
                         discoWrk.addEvent(EVT_NODE_SEGMENTED, 0, 
getSpi().getLocalNode(),
-                            Collections.<ClusterNode>emptyList());
+                            Collections.<ClusterNode>emptyList(), null);
 
                         lastSegChkRes.set(false);
                     }
@@ -1261,7 +1273,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     /** Worker for discovery events. */
     private class DiscoveryWorker extends GridWorker {
         /** Event queue. */
-        private final BlockingQueue<GridTuple4<Integer, Long, ClusterNode, 
Collection<ClusterNode>>> evts =
+        private final BlockingQueue<GridTuple5<Integer, Long, ClusterNode, 
Collection<ClusterNode>, Serializable>> evts =
             new LinkedBlockingQueue<>();
 
         /** Node segmented event fired flag. */
@@ -1292,12 +1304,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 evt.eventNode(node);
                 evt.type(type);
 
-                evt.topologySnapshot(topVer, new ArrayList<>(
-                    F.viewReadOnly(topSnapshot, new C1<ClusterNode, 
ClusterNode>() {
-                        @Override public ClusterNode apply(ClusterNode e) {
-                            return e;
-                        }
-                    }, daemonFilter)));
+                evt.topologySnapshot(topVer, U.<ClusterNode, 
ClusterNode>arrayList(topSnapshot, daemonFilter));
 
                 if (type == EVT_NODE_METRICS_UPDATED)
                     evt.message("Metrics were updated: " + node);
@@ -1327,10 +1334,16 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
          * @param node Node.
          * @param topSnapshot Topology snapshot.
          */
-        void addEvent(int type, long topVer, ClusterNode node, 
Collection<ClusterNode> topSnapshot) {
+        void addEvent(
+            int type,
+            long topVer,
+            ClusterNode node,
+            Collection<ClusterNode> topSnapshot,
+            @Nullable Serializable data
+        ) {
             assert node != null;
 
-            evts.add(F.t(type, topVer, node, topSnapshot));
+            evts.add(F.t(type, topVer, node, topSnapshot, data));
         }
 
         /**
@@ -1364,7 +1377,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         /** @throws InterruptedException If interrupted. */
         @SuppressWarnings("DuplicateCondition")
         private void body0() throws InterruptedException {
-            GridTuple4<Integer, Long, ClusterNode, Collection<ClusterNode>> 
evt = evts.take();
+            GridTuple5<Integer, Long, ClusterNode, Collection<ClusterNode>, 
Serializable> evt = evts.take();
 
             int type = evt.get1();
 
@@ -1473,6 +1486,22 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     break;
                 }
 
+                case EVT_DISCOVERY_CUSTOM_EVT: {
+                    DiscoveryCustomEvent customEvt = new 
DiscoveryCustomEvent();
+
+                    customEvt.node(ctx.discovery().localNode());
+                    customEvt.eventNode(node);
+                    customEvt.type(type);
+                    customEvt.topologySnapshot(topVer, null);
+                    customEvt.data(evt.get5());
+
+                    assert ctx.event().isRecordable(EVT_DISCOVERY_CUSTOM_EVT);
+
+                    ctx.event().record(customEvt);
+
+                    return;
+                }
+
                 // Don't log metric update to avoid flooding the log.
                 case EVT_NODE_METRICS_UPDATED:
                     break;
@@ -2058,12 +2087,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
          * @param exclNode Node to exclude.
          */
         private void filterNodeMap(ConcurrentMap<String, 
Collection<ClusterNode>> map, final ClusterNode exclNode) {
-            IgnitePredicate<ClusterNode> p = new P1<ClusterNode>() {
-                @Override public boolean apply(ClusterNode e) {
-                    return exclNode.equals(e);
-                }
-            };
-
             for (String cacheName : U.cacheNames(exclNode)) {
                 String maskedName = maskNull(cacheName);
 
@@ -2073,7 +2096,10 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     if (oldNodes == null || oldNodes.isEmpty())
                         break;
 
-                    Collection<ClusterNode> newNodes = F.lose(oldNodes, true, 
p);
+                    Collection<ClusterNode> newNodes = new 
ArrayList<>(oldNodes);
+
+                    if (!newNodes.remove(exclNode))
+                        break;
 
                     if (map.replace(maskedName, oldNodes, newNodes))
                         break;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e4b23f1..b9c7596 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -59,7 +59,6 @@ import java.math.*;
 import java.net.*;
 import java.nio.*;
 import java.nio.channels.*;
-import java.nio.channels.spi.*;
 import java.nio.charset.*;
 import java.security.*;
 import java.security.cert.*;
@@ -647,15 +646,7 @@ public abstract class IgniteUtils {
      * @return Nearest power of 2.
      */
     public static int ceilPow2(int v) {
-        v--;
-
-        v |= v >> 1;
-        v |= v >> 2;
-        v |= v >> 4;
-        v |= v >> 8;
-        v |= v >> 16;
-
-        return ++v;
+        return Integer.highestOneBit(v - 1) << 1;
     }
 
     /**
@@ -1633,13 +1624,11 @@ public abstract class IgniteUtils {
                     if (!itf.isLoopback()) {
                         Enumeration<InetAddress> addrs = 
itf.getInetAddresses();
 
-                        if (addrs != null) {
-                            for (InetAddress addr : asIterable(addrs)) {
-                                String hostAddr = addr.getHostAddress();
+                        for (InetAddress addr : asIterable(addrs)) {
+                            String hostAddr = addr.getHostAddress();
 
-                                if (!addr.isLoopbackAddress() && 
!ips.contains(hostAddr))
-                                    ips.add(hostAddr);
-                            }
+                            if (!addr.isLoopbackAddress() && 
!ips.contains(hostAddr))
+                                ips.add(hostAddr);
                         }
                     }
                 }
@@ -2832,15 +2821,8 @@ public abstract class IgniteUtils {
         if (s == null)
             return;
 
-        OutputStream out = null;
-
-        try {
-            out = new FileOutputStream(file, append);
-
-            if (s != null)
-                out.write(s.getBytes(charset));
-        } finally {
-            closeQuiet(out);
+        try (OutputStream out = new FileOutputStream(file, append)) {
+            out.write(s.getBytes(charset));
         }
     }
 
@@ -3311,35 +3293,17 @@ public abstract class IgniteUtils {
     }
 
     /**
-     * Checks for containment of value matching given regular expression in 
the provided array.
-     *
-     * @param arr Array of strings.
-     * @param regex Regular expression.
-     * @return {@code true} if string matching given regular expression found, 
{@code false} otherwise.
-     */
-    public static boolean containsRegexArray(String[] arr, String regex) {
-        assert arr != null;
-        assert regex != null;
-
-        for (String s : arr)
-            if (s != null && s.matches(regex))
-                return true;
-
-        return false;
-    }
-
-    /**
      * Closes given resource logging possible checked exception.
      *
      * @param rsrc Resource to close. If it's {@code null} - it's no-op.
      * @param log Logger to log possible checked exception with (optional).
      */
-    public static void close(@Nullable Closeable rsrc, @Nullable IgniteLogger 
log) {
+    public static void close(@Nullable AutoCloseable rsrc, @Nullable 
IgniteLogger log) {
         if (rsrc != null)
             try {
                 rsrc.close();
             }
-            catch (IOException e) {
+            catch (Exception e) {
                 warn(log, "Failed to close resource: " + e.getMessage());
             }
     }
@@ -3360,114 +3324,6 @@ public abstract class IgniteUtils {
     }
 
     /**
-     * Quietly closes given resource ignoring possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     */
-    public static void closeQuiet(@Nullable Closeable rsrc) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
-     * Closes given resource logging possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     * @param log Logger to log possible checked exception with (optional).
-     */
-    public static void close(@Nullable Socket rsrc, @Nullable IgniteLogger 
log) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException e) {
-                warn(log, "Failed to close resource: " + e.getMessage());
-            }
-    }
-
-    /**
-     * Quietly closes given resource ignoring possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     */
-    public static void closeQuiet(@Nullable Socket rsrc) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
-     * Closes given resource logging possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     * @param log Logger to log possible checked exception with (optional).
-     */
-    public static void close(@Nullable ServerSocket rsrc, @Nullable 
IgniteLogger log) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException e) {
-                warn(log, "Failed to close resource: " + e.getMessage());
-            }
-    }
-
-    /**
-     * Quietly closes given resource ignoring possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     */
-    public static void closeQuiet(@Nullable ServerSocket rsrc) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
-     * Closes given resource logging possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     * @param log Logger to log possible checked exception with (optional).
-     */
-    public static void close(@Nullable AbstractInterruptibleChannel rsrc, 
@Nullable IgniteLogger log) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException e) {
-                warn(log, "Failed to close resource: " + e.getMessage());
-            }
-    }
-
-    /**
-     * Quietly closes given resource ignoring possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     */
-    public static void closeQuiet(@Nullable AbstractInterruptibleChannel rsrc) 
{
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
      * Closes given resource logging possible checked exceptions.
      *
      * @param rsrc Resource to close. If it's {@code null} - it's no-op.
@@ -3491,83 +3347,6 @@ public abstract class IgniteUtils {
     }
 
     /**
-     * Closes given resource logging possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     * @param log Logger to log possible checked exception with (optional).
-     */
-    public static void close(@Nullable Reader rsrc, @Nullable IgniteLogger 
log) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException e) {
-                warn(log, "Failed to close resource: " + e.getMessage());
-            }
-    }
-
-    /**
-     * Quietly closes given resource ignoring possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     */
-    public static void closeQuiet(@Nullable Reader rsrc) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
-     * Quietly closes given resource ignoring possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     */
-    public static void closeQuiet(@Nullable Writer rsrc) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
-     * Closes given resource logging possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     * @param log Logger to log possible checked exception with (optional).
-     */
-    public static void close(@Nullable ZipFile rsrc, @Nullable IgniteLogger 
log) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException e) {
-                warn(log, "Failed to close resource: " + e.getMessage());
-            }
-    }
-
-    /**
-     * Quietly closes given resource ignoring possible checked exception.
-     *
-     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
-     */
-    public static void closeQuiet(@Nullable ZipFile rsrc) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
      * Closes given resource.
      *
      * @param rsrc Resource to close. If it's {@code null} - it's no-op.
@@ -3642,99 +3421,6 @@ public abstract class IgniteUtils {
     }
 
     /**
-     * Closes JDBC connection logging possible checked exception.
-     *
-     * @param rsrc JDBC connection to close. If connection is {@code null}, 
it's no-op.
-     * @param log Logger to log possible checked exception with (optional).
-     */
-    public static void close(@Nullable Connection rsrc, @Nullable IgniteLogger 
log) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (SQLException e) {
-                warn(log, "Failed to close resource: " + e.getMessage());
-            }
-    }
-
-    /**
-     * Quietly closes JDBC connection ignoring possible checked exception.
-     *
-     * @param rsrc JDBC connection to close. If connection is {@code null}, 
it's no-op.
-     */
-    public static void closeQuiet(@Nullable Connection rsrc) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (SQLException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
-     * Closes JDBC statement logging possible checked exception.
-     *
-     * @param rsrc JDBC statement to close. If statement is {@code null}, it's 
no-op.
-     * @param log Logger to log possible checked exception with (optional).
-     */
-    public static void close(@Nullable Statement rsrc, @Nullable IgniteLogger 
log) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (SQLException e) {
-                warn(log, "Failed to close resource: " + e.getMessage());
-            }
-    }
-
-    /**
-     * Quietly closes JDBC statement ignoring possible checked exception.
-     *
-     * @param rsrc JDBC statement to close. If statement is {@code null}, it's 
no-op.
-     */
-    public static void closeQuiet(@Nullable Statement rsrc) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (SQLException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
-     * Closes JDBC result set logging possible checked exception.
-     *
-     * @param rsrc JDBC result set to close. If result set is {@code null}, 
it's no-op.
-     * @param log Logger to log possible checked exception with (optional).
-     */
-    public static void close(@Nullable ResultSet rsrc, @Nullable IgniteLogger 
log) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (SQLException e) {
-                warn(log, "Failed to close resource: " + e.getMessage());
-            }
-    }
-
-    /**
-     * Quietly closes JDBC result set ignoring possible checked exception.
-     *
-     * @param rsrc JDBC result set to close. If result set is {@code null}, 
it's no-op.
-     */
-    public static void closeQuiet(@Nullable ResultSet rsrc) {
-        if (rsrc != null)
-            try {
-                rsrc.close();
-            }
-            catch (SQLException ignored) {
-                // No-op.
-            }
-    }
-
-    /**
      * Closes class loader logging possible checked exception.
      * Note: this issue for problem <a 
href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5041014";>
      * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5041014</a>.
@@ -6316,6 +6002,10 @@ public abstract class IgniteUtils {
         return arr;
     }
 
+    /**
+     * @param arr1 Array 1.
+     * @param arr2 Array 2.
+     */
     public static int[] addAll(int[] arr1, int[] arr2) {
         int[] all = new int[arr1.length + arr2.length];
 
@@ -6920,15 +6610,7 @@ public abstract class IgniteUtils {
     public static boolean isPrimitiveArray(Object obj) {
         Class<?> cls = obj.getClass();
 
-        return cls.isArray() && (
-            cls.equals(byte[].class) ||
-                cls.equals(short[].class) ||
-                cls.equals(char[].class) ||
-                cls.equals(int[].class) ||
-                cls.equals(long[].class) ||
-                cls.equals(float[].class) ||
-                cls.equals(double[].class) ||
-                cls.equals(boolean[].class));
+        return cls.isArray() && cls.getComponentType().isPrimitive();
     }
 
     /**
@@ -8620,15 +8302,15 @@ public abstract class IgniteUtils {
      * Converts a hexadecimal character to an integer.
      *
      * @param ch A character to convert to an integer digit
-     * @param index The index of the character in the source
+     * @param idx The index of the character in the source
      * @return An integer
      * @throws IgniteCheckedException Thrown if ch is an illegal hex character
      */
-    public static int toDigit(char ch, int index) throws 
IgniteCheckedException {
+    public static int toDigit(char ch, int idx) throws IgniteCheckedException {
         int digit = Character.digit(ch, 16);
 
         if (digit == -1)
-            throw new IgniteCheckedException("Illegal hexadecimal character " 
+ ch + " at index " + index);
+            throw new IgniteCheckedException("Illegal hexadecimal character " 
+ ch + " at index " + idx);
 
         return digit;
     }
@@ -9028,6 +8710,16 @@ public abstract class IgniteUtils {
 
     /**
      * @param c Collection.
+     * @return Resulting array list.
+     */
+    public static <T extends R, R> List<R> arrayList(Collection<T> c) {
+        assert c != null;
+
+        return new ArrayList<R>(c);
+    }
+
+    /**
+     * @param c Collection.
      * @param cap Initial capacity.
      * @param p Optional filters.
      * @return Resulting array list.
@@ -9037,7 +8729,7 @@ public abstract class IgniteUtils {
         assert c != null;
         assert cap >= 0;
 
-        ArrayList<R> list = new ArrayList<>(cap);
+        List<R> list = new ArrayList<>(cap);
 
         for (T t : c) {
             if (F.isAll(t, p))

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 168ae52..42bad22 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -22,6 +22,7 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.*;
 import org.jetbrains.annotations.*;
 
+import java.io.*;
 import java.util.*;
 
 /**
@@ -137,4 +138,10 @@ public interface DiscoverySpi extends IgniteSpi {
      *         does not support this method.
      */
     public long getGridStartTime();
+
+    /**
+     * Sends custom message across the ring.
+     * @param evt Event.
+     */
+    public void sendCustomEvent(Serializable evt);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
index 3e0ef02..243aaeb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery;
 import org.apache.ignite.cluster.*;
 import org.jetbrains.annotations.*;
 
+import java.io.*;
 import java.util.*;
 
 /**
@@ -37,7 +38,13 @@ public interface DiscoverySpiListener {
      * @param topSnapshot Topology snapshot after event has been occurred 
(e.g. if event is
      *      {@code EVT_NODE_JOINED}, then joined node will be in snapshot).
      * @param topHist Topology snapshots history.
+     * @param data Data for custom event.
      */
-    public void onDiscovery(int type, long topVer, ClusterNode node, 
Collection<ClusterNode> topSnapshot,
-        @Nullable Map<Long, Collection<ClusterNode>> topHist);
+    public void onDiscovery(
+        int type,
+        long topVer,
+        ClusterNode node,
+        Collection<ClusterNode> topSnapshot,
+        @Nullable Map<Long, Collection<ClusterNode>> topHist,
+        @Nullable Serializable data);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 51df9db..17d8386 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -373,6 +373,11 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(Serializable evt) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * @param recon Reconnect flag.
      * @return Whether joined successfully.
@@ -1220,7 +1225,7 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                     log.debug("Discovery notification [node=" + node + ", 
type=" + U.gridEventName(type) +
                         ", topVer=" + topVer + ']');
 
-                lsnr.onDiscovery(type, topVer, node, top, new 
TreeMap<>(topHist));
+                lsnr.onDiscovery(type, topVer, node, top, new 
TreeMap<>(topHist), null);
             }
             else if (log.isDebugEnabled())
                 log.debug("Skipped discovery notification [node=" + node + ", 
type=" + U.gridEventName(type) +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 87aebc4..f910556 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1028,7 +1028,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
                     Map<Long, Collection<ClusterNode>> hist = 
updateTopologyHistory(topVer, top);
 
-                    lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist);
+                    lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, 
null);
                 }
             }
         }
@@ -1242,6 +1242,11 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         this.nodeAuth = nodeAuth;
     }
 
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(Serializable evt) {
+        msgWorker.addMessage(new 
TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt));
+    }
+
     /**
      * Tries to join this node to topology.
      *
@@ -1714,11 +1719,11 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 log.debug("Discovery notification [node=" + node + ", 
spiState=" + spiState +
                     ", type=" + U.gridEventName(type) + ", topVer=" + topVer + 
']');
 
-            Collection<ClusterNode> top = F.<TcpDiscoveryNode, 
ClusterNode>upcast(ring.visibleNodes());
+            Collection<ClusterNode> top = F.upcast(ring.visibleNodes());
 
             Map<Long, Collection<ClusterNode>> hist = 
updateTopologyHistory(topVer, top);
 
-            lsnr.onDiscovery(type, topVer, node, top, hist);
+            lsnr.onDiscovery(type, topVer, node, top, hist, null);
         }
         else if (log.isDebugEnabled())
             log.debug("Skipped discovery notification [node=" + node + ", 
spiState=" + spiState +
@@ -2563,6 +2568,9 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             else if (msg instanceof TcpDiscoveryDiscardMessage)
                 processDiscardMessage((TcpDiscoveryDiscardMessage)msg);
 
+            else if (msg instanceof TcpDiscoveryCustomEventMessage)
+                processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+
             else
                 assert false : "Unknown message type: " + 
msg.getClass().getSimpleName();
 
@@ -4429,6 +4437,29 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);
         }
+
+        /**
+         * @param msg Message.
+         */
+        private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+            if (msg.creatorNodeId().equals(getLocalNodeId())) {
+                if (msg.senderNodeId() != null)
+                    return;
+
+                msg.senderNodeId(getLocalNodeId());
+            }
+
+            DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
+
+            TcpDiscoverySpiState spiState = spiStateCopy();
+
+            if (lsnr != null && (spiState == CONNECTED || spiState == 
DISCONNECTING))
+                lsnr.onDiscovery(EVT_DISCOVERY_CUSTOM_EVT, 
msg.topologyVersion(), ring.node(msg.creatorNodeId()), null,
+                    null, msg.message());
+
+            if (ring.hasRemoteNodes())
+                sendMessageAcrossRing(msg);
+        }
     }
 
     /**
@@ -4596,7 +4627,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                         }
                     }
 
-                    if (!U.bytesEqual(buf, 0, U.IGNITE_HEADER, 0, 4)) {
+                    if (!Arrays.equals(buf, U.IGNITE_HEADER)) {
                         if (log.isDebugEnabled())
                             log.debug("Unknown connection detected (is some 
other software connecting to " +
                                 "this Ignite port?) " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 52156a4..e47e490 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -990,6 +990,9 @@ abstract class TcpDiscoverySpiAdapter extends 
IgniteSpiAdapter implements Discov
                 log.debug("Message has been added to queue: " + msg);
         }
 
+        /**
+         * @param msg Message.
+         */
         protected abstract void processMessage(TcpDiscoveryAbstractMessage 
msg);
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
new file mode 100644
index 0000000..f83765a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Wrapped for custom message.
+ */
+public class TcpDiscoveryCustomEventMessage extends 
TcpDiscoveryAbstractMessage {
+    /** */
+    private Serializable msg;
+
+    /**
+     * Public default no-arg constructor for {@link Externalizable} interface.
+     */
+    public TcpDiscoveryCustomEventMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param creatorNodeId Creator node id.
+     */
+    public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, Serializable 
msg) {
+        super(creatorNodeId);
+
+        this.msg = msg;
+    }
+
+    /**
+     * @return Message.
+     */
+    public Serializable message() {
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        out.writeObject(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        super.readExternal(in);
+
+        msg = (Serializable)in.readObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
index ce8b3e9..64f939c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
@@ -417,4 +417,47 @@ public class GridDiscoveryEventSelfTest extends 
GridCommonAbstractTest {
             stopAllGrids();
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEvents() throws Exception {
+        try {
+            Ignite g0 = startGrid(0);
+            final Ignite g1 = startGrid(1);
+            Ignite g2 = startGrid(2);
+
+            final CountDownLatch cnt = new CountDownLatch(3);
+
+            IgnitePredicate<DiscoveryCustomEvent> lsnr = new 
IgnitePredicate<DiscoveryCustomEvent>() {
+                @Override public boolean apply(DiscoveryCustomEvent evt) {
+                    assert cnt.getCount() > 0;
+
+                    cnt.countDown();
+
+                    return true;
+                }
+            };
+
+            g0.events().localListen(lsnr, EVT_DISCOVERY_CUSTOM_EVT);
+            g1.events().localListen(lsnr, EVT_DISCOVERY_CUSTOM_EVT);
+            g2.events().localListen(lsnr, EVT_DISCOVERY_CUSTOM_EVT);
+
+            ((IgniteKernal)g1).context().discovery().sendCustomEvent("a");
+
+            cnt.await();
+
+            g0.events().localQuery(new IgnitePredicate<DiscoveryCustomEvent>() 
{
+                @Override public boolean apply(DiscoveryCustomEvent evt) {
+                    assert "a".equals(evt.data());
+                    assert 
((IgniteEx)g1).localNode().id().equals(evt.eventNode().id());
+
+                    return true;
+                }
+            }, EVT_DISCOVERY_CUSTOM_EVT);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 7f48f69..8474573 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -132,7 +132,7 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
 
         /** {@inheritDoc} */
         @Override public void onDiscovery(int type, long topVer, ClusterNode 
node, Collection<ClusterNode> topSnapshot,
-            Map<Long, Collection<ClusterNode>> topHist) {
+            Map<Long, Collection<ClusterNode>> topHist, Serializable data) {
             if (type == EVT_NODE_METRICS_UPDATED)
                 isMetricsUpdate = true;
         }
@@ -204,7 +204,8 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
 
             DiscoverySpiListener locHeartbeatLsnr = new DiscoverySpiListener() 
{
                 @Override public void onDiscovery(int type, long topVer, 
ClusterNode node,
-                    Collection<ClusterNode> topSnapshot, Map<Long, 
Collection<ClusterNode>> topHist) {
+                    Collection<ClusterNode> topSnapshot, Map<Long, 
Collection<ClusterNode>> topHist,
+                    Serializable data) {
                     // If METRICS_UPDATED came from local node
                     if (type == EVT_NODE_METRICS_UPDATED
                         && node.id().equals(spi.getLocalNode().id()))
@@ -368,7 +369,7 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
                 spi.setListener(new DiscoverySpiListener() {
                     @SuppressWarnings({"NakedNotify"})
                     @Override public void onDiscovery(int type, long topVer, 
ClusterNode node,
-                        Collection<ClusterNode> topSnapshot, Map<Long, 
Collection<ClusterNode>> topHist) {
+                        Collection<ClusterNode> topSnapshot, Map<Long, 
Collection<ClusterNode>> topHist, Serializable data) {
                         info("Discovery event [type=" + type + ", node=" + 
node + ']');
 
                         synchronized (mux) {

Reply via email to