GG-9614 Interop .Net: Implement GridEvents API. - done
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/26a713c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/26a713c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/26a713c8 Branch: refs/heads/ignite-gg-10249 Commit: 26a713c84b96bb0d89b802bd2ab3cd1319da0e2c Parents: 3f7a80a Author: ptupitsyn <ptupit...@gridgain.com> Authored: Tue May 19 18:26:20 2015 +0300 Committer: ptupitsyn <ptupit...@gridgain.com> Committed: Tue May 19 18:26:20 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 26 ++++++++++++++ .../interop/InteropAwareEventFilter.java | 37 ++++++++++++++++++++ .../interop/InteropLocalEventListener.java | 28 +++++++++++++++ .../eventstorage/GridEventStorageManager.java | 24 ++++++++++++- 4 files changed, 114 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/26a713c8/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index c60646e..505204d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.interop.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; @@ -124,6 +125,9 @@ class GridEventConsumeHandler implements GridContinuousHandler { if (filter != null) ctx.resource().injectGeneric(filter); + if (filter instanceof InteropAwareEventFilter) + ((InteropAwareEventFilter)filter).initialize(ctx); + final boolean loc = nodeId.equals(ctx.localNodeId()); lsnr = new GridLocalEventListener() { @@ -188,6 +192,28 @@ class GridEventConsumeHandler implements GridContinuousHandler { if (lsnr != null) ctx.event().removeLocalEventListener(lsnr, types); + + RuntimeException err = null; + + try { + if (filter instanceof InteropAwareEventFilter) + ((InteropAwareEventFilter)filter).close(); + } + catch(RuntimeException ex) { + err = ex; + } + + try { + if (cb instanceof InteropLocalEventListener) + ((InteropLocalEventListener)cb).close(); + } + catch (RuntimeException ex) { + if (err == null) + err = ex; + } + + if (err != null) + throw err; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/26a713c8/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropAwareEventFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropAwareEventFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropAwareEventFilter.java new file mode 100644 index 0000000..8dbc73b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropAwareEventFilter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.interop; + +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; + +/** + * Special version of predicate for events with initialize/close callbacks. + */ +public interface InteropAwareEventFilter<E extends Event> extends IgnitePredicate<E> { + /** + * Initializes the filter. + */ + public void initialize(GridKernalContext ctx); + + /** + * Closes the filter. + */ + public void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/26a713c8/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropLocalEventListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropLocalEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropLocalEventListener.java new file mode 100644 index 0000000..180863b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropLocalEventListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.interop; + +/** + * Special version of listener for events with close callbacks. + */ +public interface InteropLocalEventListener { + /** + * Closes the listener. + */ + public void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/26a713c8/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 10cc99a..95c5eb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.events.*; +import org.apache.ignite.internal.interop.*; import org.apache.ignite.internal.managers.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; @@ -650,6 +651,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } } + if (lsnr instanceof UserListenerWrapper) + { + IgnitePredicate p = ((UserListenerWrapper)lsnr).listener(); + + if (p instanceof InteropLocalEventListener) + ((InteropLocalEventListener)p).close(); + } + return found; } @@ -752,7 +761,20 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> public <T extends Event> Collection<T> localEvents(IgnitePredicate<T> p) { assert p != null; - return getSpi().localEvents(p); + if (p instanceof InteropAwareEventFilter) { + InteropAwareEventFilter p0 = (InteropAwareEventFilter)p; + + p0.initialize(ctx); + + try { + return getSpi().localEvents(p0); + } + finally { + p0.close(); + } + } + else + return getSpi().localEvents(p); } /**