Repository: incubator-ignite Updated Branches: refs/heads/ignite-900 [created] 3a9ee47d0
# IGNITE-900 Refactoring data exchange API. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3a9ee47d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3a9ee47d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3a9ee47d Branch: refs/heads/ignite-900 Commit: 3a9ee47d04b1165bf946b09ec7ffe28a1f1b3aab Parents: ae148f1 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Fri Jul 24 15:44:23 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Fri Jul 24 15:44:23 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 3 - .../managers/discovery/DataExchanger.java | 55 +++++++ .../discovery/DataExchangerAdapter.java | 46 ++++++ .../discovery/GridDiscoveryManager.java | 55 +++++-- .../continuous/GridContinuousProcessor.java | 160 +++++++++++-------- 5 files changed, 236 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 65e0644..c438cde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -35,9 +35,6 @@ public interface GridComponent { */ enum DiscoveryDataExchangeType { /** */ - CONTINUOUS_PROC, - - /** */ CACHE_PROC, /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java new file mode 100644 index 0000000..75ce47d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java @@ -0,0 +1,55 @@ +package org.apache.ignite.internal.managers.discovery;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * + */ +public interface DataExchanger<T extends Serializable> { + /** + * Gets discovery data object that will be sent to new node + * during discovery process. + * + * @param joinedNodeId ID of new node that joins topology. + * @return Discovery data object or {@code null} if there is nothing + * to send for this component. + */ + @Nullable public T collectDiscoveryData(UUID joinedNodeId); + + /** + * @return Topic ID. Must be unique. + */ + public String topicId(); + + /** + * Receives discovery data object from remote nodes (called + * on new node during discovery process). + * @param joiningNodeId Joining node ID. + * @param rmtNodeId Remote node ID for which data is provided. + * @param data Discovery data object or {@code null} if nothing was + */ + public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, T data); + + /** + * @return {@code true} if provider should works on last node only. + */ + public boolean lastNodeOnly(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java new file mode 100644 index 0000000..3762be2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java @@ -0,0 +1,46 @@ +package org.apache.ignite.internal.managers.discovery;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +public abstract class DataExchangerAdapter<T extends Serializable> implements DataExchanger<T> { + /** */ + private final String topicId; + + /** + * @param topicId Topic id. + */ + protected DataExchangerAdapter(String topicId) { + this.topicId = topicId; + } + + /** {@inheritDoc} */ + @Nullable @Override public abstract T collectDiscoveryData(UUID joinedNodeId); + + /** {@inheritDoc} */ + @Override public String topicId() { + return topicId; + } + + /** {@inheritDoc} */ + @Override public boolean lastNodeOnly() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 068d374..e887b60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -191,6 +191,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** */ private final CountDownLatch startLatch = new CountDownLatch(1); + /** */ + private final ConcurrentMap<String, DataExchanger<?>> dataExchangers = new ConcurrentHashMap8<>(); + /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { super(ctx, ctx.config().getDiscoverySpi()); @@ -565,25 +568,51 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } + HashMap<String, Serializable> exchangersMap = new HashMap<>(); + + for (DataExchanger<?> exchanger : dataExchangers.values()) { + if (!exchanger.lastNodeOnly()) { + Serializable o = exchanger.collectDiscoveryData(nodeId); + + exchangersMap.put(exchanger.topicId(), o); + } + } + + data.put(-1, exchangersMap); + return data; } @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) { for (Map.Entry<Integer, Serializable> e : data.entrySet()) { - GridComponent comp = null; + if (e.getKey() == -1) { + Map<String, Serializable> map = (Map<String, Serializable>)e.getValue(); - for (GridComponent c : ctx.components()) { - if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == e.getKey()) { - comp = c; + for (Map.Entry<String, Serializable> entry : map.entrySet()) { + DataExchanger exchanger = dataExchangers.get(entry.getKey()); - break; + if (exchanger != null) { + if (!exchanger.lastNodeOnly() || !joiningNodeId.equals(localNode().id())) + exchanger.onDiscoveryDataReceived(joiningNodeId, nodeId, entry.getValue()); + } } } + else { + GridComponent comp = null; - if (comp != null) - comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue()); - else - U.warn(log, "Received discovery data for unknown component: " + e.getKey()); + for (GridComponent c : ctx.components()) { + if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == e.getKey()) { + comp = c; + + break; + } + } + + if (comp != null) + comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue()); + else + U.warn(log, "Received discovery data for unknown component: " + e.getKey()); + } } } }); @@ -1652,6 +1681,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param dataExchanger Data exchanger. + */ + public void registerDataExchanger(DataExchanger<?> dataExchanger) { + if (dataExchangers.putIfAbsent(dataExchanger.topicId(), dataExchanger) != null) + throw new IllegalArgumentException("Duplicate topicId: " + dataExchanger.topicId()); + } + + /** * @param nodeId Node ID to fail. * @param warning Warning message to be shown on all nodes. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index daa9494..7342b6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -244,6 +244,74 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } }); + ctx.discovery().registerDataExchanger(new DataExchangerAdapter<DiscoveryData>("GridContinuousProcessor") { + @Nullable @Override public DiscoveryData collectDiscoveryData(UUID joinedNodeId) { + if (joinedNodeId.equals(ctx.localNodeId()) && locInfos.isEmpty()) + return null; + + DiscoveryData data = new DiscoveryData(ctx.localNodeId()); + + // Collect listeners information (will be sent to joining node during discovery process). + for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); + + data.addItem(new DiscoveryDataItem(routineId, + info.prjPred, + info.hnd, + info.bufSize, + info.interval, + info.autoUnsubscribe)); + } + + return data; + } + + @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, DiscoveryData data) { + if (ctx.isDaemon() || data == null) + return; + + for (DiscoveryDataItem item : data.items) { + try { + if (item.prjPred != null) + ctx.resource().injectGeneric(item.prjPred); + + // Register handler only if local node passes projection predicate. + if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) { + if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, + item.autoUnsubscribe, false)) + item.hnd.onListenerRegistered(item.routineId, ctx); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to register continuous handler.", e); + } + } + } + }); + + ctx.discovery().registerDataExchanger(new DataExchangerAdapter<SharedRoutineInfo>( + "GridContinuousProcessor.shared") { + @Nullable @Override public SharedRoutineInfo collectDiscoveryData(UUID joinedNodeId) { + return new SharedRoutineInfo(clientInfos); + } + + @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, SharedRoutineInfo data) { + for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) { + Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey()); + + if (map == null) { + map = new HashMap<>(); + + clientInfos.put(entry.getKey(), map); + } + + map.putAll(entry.getValue()); + } + } + }); + + if (log.isDebugEnabled()) log.debug("Continuous processor started."); } @@ -318,71 +386,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { log.debug("Continuous processor stopped."); } - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { - return DiscoveryDataExchangeType.CONTINUOUS_PROC; - } - - /** {@inheritDoc} */ - @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { - if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { - DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos); - - // Collect listeners information (will be sent to joining node during discovery process). - for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { - UUID routineId = e.getKey(); - LocalRoutineInfo info = e.getValue(); - - data.addItem(new DiscoveryDataItem(routineId, - info.prjPred, - info.hnd, - info.bufSize, - info.interval, - info.autoUnsubscribe)); - } - - return data; - } - - return null; - } - - /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) { - DiscoveryData data = (DiscoveryData)obj; - - if (!ctx.isDaemon() && data != null) { - for (DiscoveryDataItem item : data.items) { - try { - if (item.prjPred != null) - ctx.resource().injectGeneric(item.prjPred); - - // Register handler only if local node passes projection predicate. - if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) { - if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, - item.autoUnsubscribe, false)) - item.hnd.onListenerRegistered(item.routineId, ctx); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to register continuous handler.", e); - } - } - - for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) { - Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey()); - - if (map == null) { - map = new HashMap<>(); - - clientInfos.put(entry.getKey(), map); - } - - map.putAll(entry.getValue()); - } - } - } - /** * Callback invoked when cache is started. * @@ -1260,6 +1263,26 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * + */ + private static class SharedRoutineInfo implements Serializable { + /** */ + private final Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos; + + /** + * @param clientInfos Client infos. + */ + SharedRoutineInfo(Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) { + this.clientInfos = clientInfos; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SharedRoutineInfo.class, this); + } + } + + /** * Discovery data. */ private static class DiscoveryData implements Externalizable { @@ -1285,15 +1308,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param nodeId Node ID. - * @param clientInfos Client information. */ - DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) { + DiscoveryData(UUID nodeId) { assert nodeId != null; this.nodeId = nodeId; - this.clientInfos = clientInfos; - items = new ArrayList<>(); } @@ -1308,14 +1328,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeUuid(out, nodeId); U.writeCollection(out, items); - U.writeMap(out, clientInfos); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { nodeId = U.readUuid(in); items = U.readCollection(in); - clientInfos = U.readMap(in); } /** {@inheritDoc} */