GG-9613 Interop .Net: Implement GridMessaging API. - done
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9de5e90b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9de5e90b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9de5e90b Branch: refs/heads/ignite-709_1 Commit: 9de5e90b4b283434da124eed925c947116e9ead4 Parents: 702f172 Author: ptupitsyn <ptupit...@gridgain.com> Authored: Wed Apr 22 13:31:45 2015 +0300 Committer: ptupitsyn <ptupit...@gridgain.com> Committed: Wed Apr 22 13:31:45 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 33 ++++++++++++++++-- .../GridLifecycleAwareMessageFilter.java | 35 ++++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9de5e90b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index f0d595b..c2df045 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1383,7 +1383,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @return Whether or not the lsnr was removed. */ @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"}) - public boolean removeMessageListener(Object topic, @Nullable final GridMessageListener lsnr) { + public boolean removeMessageListener(Object topic, @Nullable GridMessageListener lsnr) { assert topic != null; boolean rmv = true; @@ -1394,7 +1394,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (lsnr == null) { closedTopics.add(topic); - rmv = lsnrMap.remove(topic) != null; + lsnr = lsnrMap.remove(topic); + + rmv = lsnr != null; Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic); @@ -1466,10 +1468,31 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (rmv && log.isDebugEnabled()) log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']'); + if (lsnr instanceof ArrayListener) + { + for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr) + closeListener(childLsnr); + } + else + closeListener(lsnr); + return rmv; } /** + * Closes a listener, if applicable. + * @param lsnr Listener. + */ + private void closeListener(GridMessageListener lsnr) { + if (lsnr instanceof GridUserMessageListener) { + GridUserMessageListener userLsnr = (GridUserMessageListener)lsnr; + + if (userLsnr.predLsnr instanceof GridLifecycleAwareMessageFilter) + ((GridLifecycleAwareMessageFilter)userLsnr.predLsnr).close(); + } + } + + /** * Gets sent messages count. * * @return Sent messages count. @@ -1647,8 +1670,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa this.topic = topic; this.predLsnr = predLsnr; - if (predLsnr != null) + if (predLsnr != null) { ctx.resource().injectGeneric(predLsnr); + + if (predLsnr instanceof GridLifecycleAwareMessageFilter) + ((GridLifecycleAwareMessageFilter)predLsnr).initialize(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9de5e90b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java new file mode 100644 index 0000000..cb99d2e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import org.apache.ignite.lang.*; + +/** + * Special version of bi-predicate for messaging with initialize/close callbacks. + */ +public interface GridLifecycleAwareMessageFilter<K, V> extends IgniteBiPredicate<K, V> { + /** + * Initializes the filter. + */ + public void initialize(); + + /** + * Closes the filter. + */ + public void close(); +}