ignite-312 review
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2bfb8932 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2bfb8932 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2bfb8932 Branch: refs/heads/ignite-264 Commit: 2bfb89322473b6579a59d114b45eb6e9532df4d6 Parents: 7423b7c Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Wed Feb 25 22:07:59 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Wed Feb 25 22:07:59 2015 +0300 ---------------------------------------------------------------------- .../ignite/events/DiscoveryCustomEvent.java | 56 +++ .../org/apache/ignite/events/EventType.java | 12 + .../discovery/GridDiscoveryManager.java | 78 ++-- .../ignite/internal/util/IgniteUtils.java | 364 ++----------------- .../ignite/spi/discovery/DiscoverySpi.java | 7 + .../spi/discovery/DiscoverySpiListener.java | 11 +- .../discovery/tcp/TcpClientDiscoverySpi.java | 7 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 39 +- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 3 + .../TcpDiscoveryCustomEventMessage.java | 66 ++++ .../internal/GridDiscoveryEventSelfTest.java | 43 +++ .../discovery/AbstractDiscoverySelfTest.java | 7 +- 12 files changed, 321 insertions(+), 372 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java new file mode 100644 index 0000000..af15055 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.events; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * + */ +public class DiscoveryCustomEvent extends DiscoveryEvent { + /** */ + private Serializable data; + + /** + * Default constructor. + */ + public DiscoveryCustomEvent() { + type(EventType.EVT_DISCOVERY_CUSTOM_EVT); + } + + /** + * @return Data. + */ + public Serializable data() { + return data; + } + + /** + * @param data New data. + */ + public void data(Serializable data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryCustomEvent.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/events/EventType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java index 9f7e2c4..d9ffc2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java +++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java @@ -17,8 +17,10 @@ package org.apache.ignite.events; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.io.*; import java.util.*; /** @@ -158,6 +160,16 @@ public interface EventType { public static final int EVT_CLIENT_NODE_RECONNECTED = 17; /** + * Built-in event type: custom event sent. + * <br> + * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(Serializable)}. + * <p> + * + * @see DiscoveryCustomEvent + */ + public static final int EVT_DISCOVERY_CUSTOM_EVT = 18; + + /** * Built-in event type: task started. * <p> * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 711229a..b65da50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -255,8 +255,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } getSpi().setListener(new DiscoverySpiListener() { - @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, - Map<Long, Collection<ClusterNode>> snapshots) { + @Override public void onDiscovery( + int type, + long topVer, + ClusterNode node, + Collection<ClusterNode> topSnapshot, + Map<Long, Collection<ClusterNode>> snapshots, + @Nullable Serializable data + ) { final ClusterNode locNode = localNode(); if (snapshots != null) @@ -270,7 +276,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. - if (type != EVT_NODE_METRICS_UPDATED) { + if (type != EVT_NODE_METRICS_UPDATED && type != EVT_DISCOVERY_CUSTOM_EVT) { DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id()))); discoCacheHist.put(topVer, cache); @@ -305,7 +311,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ", evt=" + U.gridEventName(type) + ']'; } - discoWrk.addEvent(type, topVer, node, topSnapshot); + discoWrk.addEvent(type, topVer, node, topSnapshot, data); } }); @@ -778,12 +784,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // Stop receiving notifications. getSpi().setListener(null); - // Stop discovery worker. + // Stop discovery worker and metrics updater. U.cancel(discoWrk); - U.join(discoWrk, log); - - // Stop metrics updater. U.cancel(metricsUpdater); + + U.join(discoWrk, log); U.join(metricsUpdater, log); // Stop SPI itself. @@ -1185,6 +1190,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ).start(); } + /** + * @param evt Event. + */ + public void sendCustomEvent(Serializable evt) { + getSpi().sendCustomEvent(evt); + } + /** Worker for network segment checks. */ private class SegmentCheckWorker extends GridWorker { /** */ @@ -1241,7 +1253,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (!segValid) { discoWrk.addEvent(EVT_NODE_SEGMENTED, 0, getSpi().getLocalNode(), - Collections.<ClusterNode>emptyList()); + Collections.<ClusterNode>emptyList(), null); lastSegChkRes.set(false); } @@ -1261,7 +1273,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { /** Event queue. */ - private final BlockingQueue<GridTuple4<Integer, Long, ClusterNode, Collection<ClusterNode>>> evts = + private final BlockingQueue<GridTuple5<Integer, Long, ClusterNode, Collection<ClusterNode>, Serializable>> evts = new LinkedBlockingQueue<>(); /** Node segmented event fired flag. */ @@ -1292,12 +1304,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { evt.eventNode(node); evt.type(type); - evt.topologySnapshot(topVer, new ArrayList<>( - F.viewReadOnly(topSnapshot, new C1<ClusterNode, ClusterNode>() { - @Override public ClusterNode apply(ClusterNode e) { - return e; - } - }, daemonFilter))); + evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter)); if (type == EVT_NODE_METRICS_UPDATED) evt.message("Metrics were updated: " + node); @@ -1327,10 +1334,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param node Node. * @param topSnapshot Topology snapshot. */ - void addEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { + void addEvent( + int type, + long topVer, + ClusterNode node, + Collection<ClusterNode> topSnapshot, + @Nullable Serializable data + ) { assert node != null; - evts.add(F.t(type, topVer, node, topSnapshot)); + evts.add(F.t(type, topVer, node, topSnapshot, data)); } /** @@ -1364,7 +1377,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @throws InterruptedException If interrupted. */ @SuppressWarnings("DuplicateCondition") private void body0() throws InterruptedException { - GridTuple4<Integer, Long, ClusterNode, Collection<ClusterNode>> evt = evts.take(); + GridTuple5<Integer, Long, ClusterNode, Collection<ClusterNode>, Serializable> evt = evts.take(); int type = evt.get1(); @@ -1473,6 +1486,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { break; } + case EVT_DISCOVERY_CUSTOM_EVT: { + DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent(); + + customEvt.node(ctx.discovery().localNode()); + customEvt.eventNode(node); + customEvt.type(type); + customEvt.topologySnapshot(topVer, null); + customEvt.data(evt.get5()); + + assert ctx.event().isRecordable(EVT_DISCOVERY_CUSTOM_EVT); + + ctx.event().record(customEvt); + + return; + } + // Don't log metric update to avoid flooding the log. case EVT_NODE_METRICS_UPDATED: break; @@ -2058,12 +2087,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param exclNode Node to exclude. */ private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) { - IgnitePredicate<ClusterNode> p = new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return exclNode.equals(e); - } - }; - for (String cacheName : U.cacheNames(exclNode)) { String maskedName = maskNull(cacheName); @@ -2073,7 +2096,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (oldNodes == null || oldNodes.isEmpty()) break; - Collection<ClusterNode> newNodes = F.lose(oldNodes, true, p); + Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes); + + if (!newNodes.remove(exclNode)) + break; if (map.replace(maskedName, oldNodes, newNodes)) break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index e4b23f1..b9c7596 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -59,7 +59,6 @@ import java.math.*; import java.net.*; import java.nio.*; import java.nio.channels.*; -import java.nio.channels.spi.*; import java.nio.charset.*; import java.security.*; import java.security.cert.*; @@ -647,15 +646,7 @@ public abstract class IgniteUtils { * @return Nearest power of 2. */ public static int ceilPow2(int v) { - v--; - - v |= v >> 1; - v |= v >> 2; - v |= v >> 4; - v |= v >> 8; - v |= v >> 16; - - return ++v; + return Integer.highestOneBit(v - 1) << 1; } /** @@ -1633,13 +1624,11 @@ public abstract class IgniteUtils { if (!itf.isLoopback()) { Enumeration<InetAddress> addrs = itf.getInetAddresses(); - if (addrs != null) { - for (InetAddress addr : asIterable(addrs)) { - String hostAddr = addr.getHostAddress(); + for (InetAddress addr : asIterable(addrs)) { + String hostAddr = addr.getHostAddress(); - if (!addr.isLoopbackAddress() && !ips.contains(hostAddr)) - ips.add(hostAddr); - } + if (!addr.isLoopbackAddress() && !ips.contains(hostAddr)) + ips.add(hostAddr); } } } @@ -2832,15 +2821,8 @@ public abstract class IgniteUtils { if (s == null) return; - OutputStream out = null; - - try { - out = new FileOutputStream(file, append); - - if (s != null) - out.write(s.getBytes(charset)); - } finally { - closeQuiet(out); + try (OutputStream out = new FileOutputStream(file, append)) { + out.write(s.getBytes(charset)); } } @@ -3311,35 +3293,17 @@ public abstract class IgniteUtils { } /** - * Checks for containment of value matching given regular expression in the provided array. - * - * @param arr Array of strings. - * @param regex Regular expression. - * @return {@code true} if string matching given regular expression found, {@code false} otherwise. - */ - public static boolean containsRegexArray(String[] arr, String regex) { - assert arr != null; - assert regex != null; - - for (String s : arr) - if (s != null && s.matches(regex)) - return true; - - return false; - } - - /** * Closes given resource logging possible checked exception. * * @param rsrc Resource to close. If it's {@code null} - it's no-op. * @param log Logger to log possible checked exception with (optional). */ - public static void close(@Nullable Closeable rsrc, @Nullable IgniteLogger log) { + public static void close(@Nullable AutoCloseable rsrc, @Nullable IgniteLogger log) { if (rsrc != null) try { rsrc.close(); } - catch (IOException e) { + catch (Exception e) { warn(log, "Failed to close resource: " + e.getMessage()); } } @@ -3360,114 +3324,6 @@ public abstract class IgniteUtils { } /** - * Quietly closes given resource ignoring possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - */ - public static void closeQuiet(@Nullable Closeable rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException ignored) { - // No-op. - } - } - - /** - * Closes given resource logging possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - * @param log Logger to log possible checked exception with (optional). - */ - public static void close(@Nullable Socket rsrc, @Nullable IgniteLogger log) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException e) { - warn(log, "Failed to close resource: " + e.getMessage()); - } - } - - /** - * Quietly closes given resource ignoring possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - */ - public static void closeQuiet(@Nullable Socket rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException ignored) { - // No-op. - } - } - - /** - * Closes given resource logging possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - * @param log Logger to log possible checked exception with (optional). - */ - public static void close(@Nullable ServerSocket rsrc, @Nullable IgniteLogger log) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException e) { - warn(log, "Failed to close resource: " + e.getMessage()); - } - } - - /** - * Quietly closes given resource ignoring possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - */ - public static void closeQuiet(@Nullable ServerSocket rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException ignored) { - // No-op. - } - } - - /** - * Closes given resource logging possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - * @param log Logger to log possible checked exception with (optional). - */ - public static void close(@Nullable AbstractInterruptibleChannel rsrc, @Nullable IgniteLogger log) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException e) { - warn(log, "Failed to close resource: " + e.getMessage()); - } - } - - /** - * Quietly closes given resource ignoring possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - */ - public static void closeQuiet(@Nullable AbstractInterruptibleChannel rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException ignored) { - // No-op. - } - } - - /** * Closes given resource logging possible checked exceptions. * * @param rsrc Resource to close. If it's {@code null} - it's no-op. @@ -3491,83 +3347,6 @@ public abstract class IgniteUtils { } /** - * Closes given resource logging possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - * @param log Logger to log possible checked exception with (optional). - */ - public static void close(@Nullable Reader rsrc, @Nullable IgniteLogger log) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException e) { - warn(log, "Failed to close resource: " + e.getMessage()); - } - } - - /** - * Quietly closes given resource ignoring possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - */ - public static void closeQuiet(@Nullable Reader rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException ignored) { - // No-op. - } - } - - /** - * Quietly closes given resource ignoring possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - */ - public static void closeQuiet(@Nullable Writer rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException ignored) { - // No-op. - } - } - - /** - * Closes given resource logging possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - * @param log Logger to log possible checked exception with (optional). - */ - public static void close(@Nullable ZipFile rsrc, @Nullable IgniteLogger log) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException e) { - warn(log, "Failed to close resource: " + e.getMessage()); - } - } - - /** - * Quietly closes given resource ignoring possible checked exception. - * - * @param rsrc Resource to close. If it's {@code null} - it's no-op. - */ - public static void closeQuiet(@Nullable ZipFile rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (IOException ignored) { - // No-op. - } - } - - /** * Closes given resource. * * @param rsrc Resource to close. If it's {@code null} - it's no-op. @@ -3642,99 +3421,6 @@ public abstract class IgniteUtils { } /** - * Closes JDBC connection logging possible checked exception. - * - * @param rsrc JDBC connection to close. If connection is {@code null}, it's no-op. - * @param log Logger to log possible checked exception with (optional). - */ - public static void close(@Nullable Connection rsrc, @Nullable IgniteLogger log) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (SQLException e) { - warn(log, "Failed to close resource: " + e.getMessage()); - } - } - - /** - * Quietly closes JDBC connection ignoring possible checked exception. - * - * @param rsrc JDBC connection to close. If connection is {@code null}, it's no-op. - */ - public static void closeQuiet(@Nullable Connection rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (SQLException ignored) { - // No-op. - } - } - - /** - * Closes JDBC statement logging possible checked exception. - * - * @param rsrc JDBC statement to close. If statement is {@code null}, it's no-op. - * @param log Logger to log possible checked exception with (optional). - */ - public static void close(@Nullable Statement rsrc, @Nullable IgniteLogger log) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (SQLException e) { - warn(log, "Failed to close resource: " + e.getMessage()); - } - } - - /** - * Quietly closes JDBC statement ignoring possible checked exception. - * - * @param rsrc JDBC statement to close. If statement is {@code null}, it's no-op. - */ - public static void closeQuiet(@Nullable Statement rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (SQLException ignored) { - // No-op. - } - } - - /** - * Closes JDBC result set logging possible checked exception. - * - * @param rsrc JDBC result set to close. If result set is {@code null}, it's no-op. - * @param log Logger to log possible checked exception with (optional). - */ - public static void close(@Nullable ResultSet rsrc, @Nullable IgniteLogger log) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (SQLException e) { - warn(log, "Failed to close resource: " + e.getMessage()); - } - } - - /** - * Quietly closes JDBC result set ignoring possible checked exception. - * - * @param rsrc JDBC result set to close. If result set is {@code null}, it's no-op. - */ - public static void closeQuiet(@Nullable ResultSet rsrc) { - if (rsrc != null) - try { - rsrc.close(); - } - catch (SQLException ignored) { - // No-op. - } - } - - /** * Closes class loader logging possible checked exception. * Note: this issue for problem <a href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5041014"> * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5041014</a>. @@ -6316,6 +6002,10 @@ public abstract class IgniteUtils { return arr; } + /** + * @param arr1 Array 1. + * @param arr2 Array 2. + */ public static int[] addAll(int[] arr1, int[] arr2) { int[] all = new int[arr1.length + arr2.length]; @@ -6920,15 +6610,7 @@ public abstract class IgniteUtils { public static boolean isPrimitiveArray(Object obj) { Class<?> cls = obj.getClass(); - return cls.isArray() && ( - cls.equals(byte[].class) || - cls.equals(short[].class) || - cls.equals(char[].class) || - cls.equals(int[].class) || - cls.equals(long[].class) || - cls.equals(float[].class) || - cls.equals(double[].class) || - cls.equals(boolean[].class)); + return cls.isArray() && cls.getComponentType().isPrimitive(); } /** @@ -8620,15 +8302,15 @@ public abstract class IgniteUtils { * Converts a hexadecimal character to an integer. * * @param ch A character to convert to an integer digit - * @param index The index of the character in the source + * @param idx The index of the character in the source * @return An integer * @throws IgniteCheckedException Thrown if ch is an illegal hex character */ - public static int toDigit(char ch, int index) throws IgniteCheckedException { + public static int toDigit(char ch, int idx) throws IgniteCheckedException { int digit = Character.digit(ch, 16); if (digit == -1) - throw new IgniteCheckedException("Illegal hexadecimal character " + ch + " at index " + index); + throw new IgniteCheckedException("Illegal hexadecimal character " + ch + " at index " + idx); return digit; } @@ -9028,6 +8710,16 @@ public abstract class IgniteUtils { /** * @param c Collection. + * @return Resulting array list. + */ + public static <T extends R, R> List<R> arrayList(Collection<T> c) { + assert c != null; + + return new ArrayList<R>(c); + } + + /** + * @param c Collection. * @param cap Initial capacity. * @param p Optional filters. * @return Resulting array list. @@ -9037,7 +8729,7 @@ public abstract class IgniteUtils { assert c != null; assert cap >= 0; - ArrayList<R> list = new ArrayList<>(cap); + List<R> list = new ArrayList<>(cap); for (T t : c) { if (F.isAll(t, p)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 168ae52..42bad22 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -22,6 +22,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; /** @@ -137,4 +138,10 @@ public interface DiscoverySpi extends IgniteSpi { * does not support this method. */ public long getGridStartTime(); + + /** + * Sends custom message across the ring. + * @param evt Event. + */ + public void sendCustomEvent(Serializable evt); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java index 3e0ef02..243aaeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery; import org.apache.ignite.cluster.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; /** @@ -37,7 +38,13 @@ public interface DiscoverySpiListener { * @param topSnapshot Topology snapshot after event has been occurred (e.g. if event is * {@code EVT_NODE_JOINED}, then joined node will be in snapshot). * @param topHist Topology snapshots history. + * @param data Data for custom event. */ - public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, - @Nullable Map<Long, Collection<ClusterNode>> topHist); + public void onDiscovery( + int type, + long topVer, + ClusterNode node, + Collection<ClusterNode> topSnapshot, + @Nullable Map<Long, Collection<ClusterNode>> topHist, + @Nullable Serializable data); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 51df9db..17d8386 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -373,6 +373,11 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp // No-op. } + /** {@inheritDoc} */ + @Override public void sendCustomEvent(Serializable evt) { + throw new UnsupportedOperationException(); + } + /** * @param recon Reconnect flag. * @return Whether joined successfully. @@ -1220,7 +1225,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist)); + lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), null); } else if (log.isDebugEnabled()) log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 87aebc4..f910556 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1028,7 +1028,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); - lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist); + lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null); } } } @@ -1242,6 +1242,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov this.nodeAuth = nodeAuth; } + /** {@inheritDoc} */ + @Override public void sendCustomEvent(Serializable evt) { + msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt)); + } + /** * Tries to join this node to topology. * @@ -1714,11 +1719,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - Collection<ClusterNode> top = F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleNodes()); + Collection<ClusterNode> top = F.upcast(ring.visibleNodes()); Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); - lsnr.onDiscovery(type, topVer, node, top, hist); + lsnr.onDiscovery(type, topVer, node, top, hist, null); } else if (log.isDebugEnabled()) log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + @@ -2563,6 +2568,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov else if (msg instanceof TcpDiscoveryDiscardMessage) processDiscardMessage((TcpDiscoveryDiscardMessage)msg); + else if (msg instanceof TcpDiscoveryCustomEventMessage) + processCustomMessage((TcpDiscoveryCustomEventMessage)msg); + else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); @@ -4429,6 +4437,29 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); } + + /** + * @param msg Message. + */ + private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { + if (msg.creatorNodeId().equals(getLocalNodeId())) { + if (msg.senderNodeId() != null) + return; + + msg.senderNodeId(getLocalNodeId()); + } + + DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr; + + TcpDiscoverySpiState spiState = spiStateCopy(); + + if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) + lsnr.onDiscovery(EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), ring.node(msg.creatorNodeId()), null, + null, msg.message()); + + if (ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); + } } /** @@ -4596,7 +4627,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } } - if (!U.bytesEqual(buf, 0, U.IGNITE_HEADER, 0, 4)) { + if (!Arrays.equals(buf, U.IGNITE_HEADER)) { if (log.isDebugEnabled()) log.debug("Unknown connection detected (is some other software connecting to " + "this Ignite port?) " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index 52156a4..e47e490 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -990,6 +990,9 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov log.debug("Message has been added to queue: " + msg); } + /** + * @param msg Message. + */ protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java new file mode 100644 index 0000000..f83765a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.io.*; +import java.util.*; + +/** + * Wrapped for custom message. + */ +public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage { + /** */ + private Serializable msg; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryCustomEventMessage() { + // No-op. + } + + /** + * @param creatorNodeId Creator node id. + */ + public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, Serializable msg) { + super(creatorNodeId); + + this.msg = msg; + } + + /** + * @return Message. + */ + public Serializable message() { + return msg; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeObject(msg); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + msg = (Serializable)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java index ce8b3e9..64f939c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java @@ -417,4 +417,47 @@ public class GridDiscoveryEventSelfTest extends GridCommonAbstractTest { stopAllGrids(); } } + + /** + * @throws Exception If failed. + */ + public void testCustomEvents() throws Exception { + try { + Ignite g0 = startGrid(0); + final Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + final CountDownLatch cnt = new CountDownLatch(3); + + IgnitePredicate<DiscoveryCustomEvent> lsnr = new IgnitePredicate<DiscoveryCustomEvent>() { + @Override public boolean apply(DiscoveryCustomEvent evt) { + assert cnt.getCount() > 0; + + cnt.countDown(); + + return true; + } + }; + + g0.events().localListen(lsnr, EVT_DISCOVERY_CUSTOM_EVT); + g1.events().localListen(lsnr, EVT_DISCOVERY_CUSTOM_EVT); + g2.events().localListen(lsnr, EVT_DISCOVERY_CUSTOM_EVT); + + ((IgniteKernal)g1).context().discovery().sendCustomEvent("a"); + + cnt.await(); + + g0.events().localQuery(new IgnitePredicate<DiscoveryCustomEvent>() { + @Override public boolean apply(DiscoveryCustomEvent evt) { + assert "a".equals(evt.data()); + assert ((IgniteEx)g1).localNode().id().equals(evt.eventNode().id()); + + return true; + } + }, EVT_DISCOVERY_CUSTOM_EVT); + } + finally { + stopAllGrids(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2bfb8932/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java index 7f48f69..8474573 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java @@ -132,7 +132,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri /** {@inheritDoc} */ @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, - Map<Long, Collection<ClusterNode>> topHist) { + Map<Long, Collection<ClusterNode>> topHist, Serializable data) { if (type == EVT_NODE_METRICS_UPDATED) isMetricsUpdate = true; } @@ -204,7 +204,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri DiscoverySpiListener locHeartbeatLsnr = new DiscoverySpiListener() { @Override public void onDiscovery(int type, long topVer, ClusterNode node, - Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist) { + Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, + Serializable data) { // If METRICS_UPDATED came from local node if (type == EVT_NODE_METRICS_UPDATED && node.id().equals(spi.getLocalNode().id())) @@ -368,7 +369,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri spi.setListener(new DiscoverySpiListener() { @SuppressWarnings({"NakedNotify"}) @Override public void onDiscovery(int type, long topVer, ClusterNode node, - Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist) { + Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, Serializable data) { info("Discovery event [type=" + type + ", node=" + node + ']'); synchronized (mux) {