Repository: incubator-ignite Updated Branches: refs/heads/master 725d79ff4 -> c0c28abdd
interop .Net: Merge from sp31 (without .Net component). Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c0c28abd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c0c28abd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c0c28abd Branch: refs/heads/master Commit: c0c28abdd32c9fe63866f28044aecbe3386c868f Parents: 725d79f Author: vozerov-gridgain <voze...@gridgain.com> Authored: Fri Jan 16 17:21:12 2015 +0400 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Fri Jan 16 17:21:12 2015 +0400 ---------------------------------------------------------------------- .../ignite/portables/PortableRawReader.java | 2 +- .../grid/dotnet/GridDotNetConfiguration.java | 14 +++- .../dotnet/GridDotNetPortableConfiguration.java | 28 +++----- .../GridDotNetPortableTypeConfiguration.java | 15 +--- .../org/gridgain/grid/kernal/GridGainEx.java | 76 ++++++++++++++------ .../grid/kernal/GridNodeAttributes.java | 5 +- .../processors/cache/GridCacheAdapter.java | 15 +++- .../processors/cache/GridCacheContext.java | 2 +- .../processors/cache/GridCacheEntryImpl.java | 2 +- .../GridCacheContinuousQueryAdapter.java | 13 +++- .../GridCacheContinuousQueryFilterEx.java | 32 +++++++++ .../GridCacheContinuousQueryHandler.java | 17 ++++- .../GridCacheContinuousQueryHandlerV3.java | 61 ++++++++++++++++ .../GridCacheContinuousQueryHandlerV4.java | 61 ++++++++++++++++ .../GridCacheContinuousQueryListener.java | 5 ++ .../GridCacheContinuousQueryManager.java | 55 ++++++++++---- .../interop/GridInteropProcessorAdapter.java | 3 - 17 files changed, 318 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java b/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java index 67e9b13..5801aa8 100644 --- a/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java +++ b/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java @@ -112,7 +112,7 @@ public interface PortableRawReader { * @return Object. * @throws PortableException In case of error. */ - @Nullable public Object readObject() throws PortableException; + @Nullable public <T> T readObject() throws PortableException; /** * @return Byte array. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java index 07a1a0d..a8a595d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java +++ b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java @@ -41,7 +41,8 @@ public class GridDotNetConfiguration implements PortableMarshalAware { /** * Copy constructor. - * @param cfg configuration to copy. + * + * @param cfg Configuration to copy. */ public GridDotNetConfiguration(GridDotNetConfiguration cfg) { if (cfg.getPortableConfiguration() != null) @@ -80,6 +81,15 @@ public class GridDotNetConfiguration implements PortableMarshalAware { this.assemblies = assemblies; } + /** + * Copy configuration. + * + * @return Copied configuration. + */ + public GridDotNetConfiguration copy() { + return new GridDotNetConfiguration(this); + } + /** {@inheritDoc} */ @Override public void writePortable(PortableWriter writer) throws PortableException { PortableRawWriter rawWriter = writer.rawWriter(); @@ -92,7 +102,7 @@ public class GridDotNetConfiguration implements PortableMarshalAware { @Override public void readPortable(PortableReader reader) throws PortableException { PortableRawReader rawReader = reader.rawReader(); - portableCfg = (GridDotNetPortableConfiguration)rawReader.readObject(); + portableCfg = rawReader.readObject(); assemblies = (List<String>)rawReader.<String>readCollection(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java index ec5fb06..6398d0b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java +++ b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java @@ -45,7 +45,7 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware { private boolean dfltMetadataEnabled = true; /** Whether to cache deserialized value in IGridPortableObject */ - private boolean keepDeserialized = true; + private boolean dfltKeepDeserialized = true; /** * Default constructor. @@ -73,7 +73,7 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware { dfltIdMapper = cfg.getDefaultIdMapper(); dfltSerializer = cfg.getDefaultSerializer(); dfltMetadataEnabled = cfg.getDefaultMetadataEnabled(); - keepDeserialized = cfg.getKeepDeserialized(); + dfltKeepDeserialized = cfg.getDefaultKeepDeserialized(); } /** @@ -163,15 +163,15 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware { /** * @return Flag indicates whether to cache deserialized value in IGridPortableObject. */ - public boolean getKeepDeserialized() { - return keepDeserialized; + public boolean getDefaultKeepDeserialized() { + return dfltKeepDeserialized; } /** * @param keepDeserialized Keep deserialized flag. */ - public void setKeepDeserialized(boolean keepDeserialized) { - this.keepDeserialized = keepDeserialized; + public void setDefaultKeepDeserialized(boolean keepDeserialized) { + this.dfltKeepDeserialized = keepDeserialized; } /** {@inheritDoc} */ @@ -179,18 +179,12 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware { PortableRawWriter rawWriter = writer.rawWriter(); rawWriter.writeCollection(typesCfg); - rawWriter.writeCollection(types); - rawWriter.writeString(dfltNameMapper); - rawWriter.writeString(dfltIdMapper); - rawWriter.writeString(dfltSerializer); - rawWriter.writeBoolean(dfltMetadataEnabled); - - rawWriter.writeBoolean(keepDeserialized); + rawWriter.writeBoolean(dfltKeepDeserialized); } /** {@inheritDoc} */ @@ -198,18 +192,12 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware { PortableRawReader rawReader = reader.rawReader(); typesCfg = rawReader.readCollection(); - types = rawReader.readCollection(); - dfltNameMapper = rawReader.readString(); - dfltIdMapper = rawReader.readString(); - dfltSerializer = rawReader.readString(); - dfltMetadataEnabled = rawReader.readBoolean(); - - keepDeserialized = rawReader.readBoolean(); + dfltKeepDeserialized = rawReader.readBoolean(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java index 18e5a06..7e72ff3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java +++ b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java @@ -68,6 +68,7 @@ public class GridDotNetPortableTypeConfiguration implements PortableMarshalAware serializer = cfg.getSerializer(); affinityKeyFieldName = cfg.getAffinityKeyFieldName(); metadataEnabled = cfg.getMetadataEnabled(); + keepDeserialized = cfg.isKeepDeserialized(); } /** @@ -187,19 +188,12 @@ public class GridDotNetPortableTypeConfiguration implements PortableMarshalAware PortableRawWriter rawWriter = writer.rawWriter(); rawWriter.writeString(assemblyName); - rawWriter.writeString(typeName); - rawWriter.writeString(nameMapper); - rawWriter.writeString(idMapper); - rawWriter.writeString(serializer); - rawWriter.writeString(affinityKeyFieldName); - rawWriter.writeObject(metadataEnabled); - rawWriter.writeObject(keepDeserialized); } @@ -208,19 +202,12 @@ public class GridDotNetPortableTypeConfiguration implements PortableMarshalAware PortableRawReader rawReader = reader.rawReader(); assemblyName = rawReader.readString(); - typeName = rawReader.readString(); - nameMapper = rawReader.readString(); - idMapper = rawReader.readString(); - serializer = rawReader.readString(); - affinityKeyFieldName = rawReader.readString(); - metadataEnabled = (Boolean)rawReader.readObject(); - keepDeserialized = (Boolean)rawReader.readObject(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java index 5028599..81a8c1f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java @@ -525,15 +525,15 @@ public class GridGainEx { * * @param springCfgPath Spring config path. * @param gridName Grid name. - * @param envPtr Environment pointer. + * @param cfgClo Configuration closure. * @return Started Grid. * @throws IgniteCheckedException If failed. */ - public static Ignite startInterop(@Nullable String springCfgPath, @Nullable String gridName, long envPtr) - throws IgniteCheckedException { - GridInteropProcessorAdapter.ENV_PTR.set(envPtr); + public static Ignite startInterop(@Nullable String springCfgPath, @Nullable String gridName, + IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo) throws IgniteCheckedException { + URL url = resolveSpringUrl(springCfgPath); - return start(springCfgPath, gridName); + return start(url, gridName, null, cfgClo); } /** @@ -654,21 +654,7 @@ public class GridGainEx { */ public static Ignite start(String springCfgPath, @Nullable String gridName, @Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException { - A.notNull(springCfgPath, "springCfgPath"); - - URL url; - - try { - url = new URL(springCfgPath); - } - catch (MalformedURLException e) { - url = U.resolveGridGainUrl(springCfgPath); - - if (url == null) - throw new IgniteCheckedException("Spring XML configuration path is invalid: " + springCfgPath + - ". Note that this path should be either absolute or a relative local file system path, " + - "relative to META-INF in classpath or valid URL to GRIDGAIN_HOME.", e); - } + URL url = resolveSpringUrl(springCfgPath); return start(url, gridName, springCtx); } @@ -716,6 +702,23 @@ public class GridGainEx { */ public static Ignite start(URL springCfgUrl, @Nullable String gridName, @Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException { + return start(springCfgUrl, gridName, springCtx, null); + } + + /** + * Internal Spring-based start routine. + * + * @param springCfgUrl Spring XML configuration file URL. This cannot be {@code null}. + * @param gridName Grid name that will override default. + * @param springCtx Optional Spring application context. + * @param cfgClo Optional closure to change configuration before it is used to start the grid. + * @return Started grid. + * @throws IgniteCheckedException If failed. + */ + private static Ignite start(URL springCfgUrl, @Nullable String gridName, + @Nullable GridSpringResourceContext springCtx, + @Nullable IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo) + throws IgniteCheckedException { A.notNull(springCfgUrl, "springCfgUrl"); boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null; @@ -751,6 +754,12 @@ public class GridGainEx { if (cfg.getGridName() == null && !F.isEmpty(gridName)) cfg.setGridName(gridName); + if (cfgClo != null) { + cfg = cfgClo.apply(cfg); + + assert cfg != null; + } + // Use either user defined context or our one. GridNamedInstance grid = start0( new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx)); @@ -781,6 +790,33 @@ public class GridGainEx { } /** + * Resolve Spring configuration URL. + * + * @param springCfgPath Spring XML configuration file path or URL. This cannot be {@code null}. + * @return URL. + * @throws IgniteCheckedException If failed. + */ + private static URL resolveSpringUrl(String springCfgPath) throws IgniteCheckedException { + A.notNull(springCfgPath, "springCfgPath"); + + URL url; + + try { + url = new URL(springCfgPath); + } + catch (MalformedURLException e) { + url = U.resolveGridGainUrl(springCfgPath); + + if (url == null) + throw new IgniteCheckedException("Spring XML configuration path is invalid: " + springCfgPath + + ". Note that this path should be either absolute or a relative local file system path, " + + "relative to META-INF in classpath or valid URL to GRIDGAIN_HOME.", e); + } + + return url; + } + + /** * Starts grid with given configuration. * * @param startCtx Start context. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java index f77be58..da818d4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java @@ -22,7 +22,7 @@ package org.gridgain.grid.kernal; */ public final class GridNodeAttributes { /** Prefix for internally reserved attribute names. */ - static final String ATTR_PREFIX = "org.gridgain"; + public static final String ATTR_PREFIX = "org.gridgain"; /** Node compound version. */ public static final String ATTR_BUILD_VER = ATTR_PREFIX + ".build.ver"; @@ -136,9 +136,6 @@ public final class GridNodeAttributes { /** Cache interceptors. */ public static final String ATTR_CACHE_INTERCEPTORS = ATTR_PREFIX + ".cache.interceptors"; - /** Native platform. */ - public static final String ATTR_INTEROP_PLATFORM = ATTR_PREFIX + ".interop.platform"; - /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 97d914a..991cc74 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -399,7 +399,18 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im /** {@inheritDoc} */ @Override public <K1, V1> GridCacheProjection<K1, V1> keepPortable() { - GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>( + GridCacheProjectionImpl<K1, V1> prj = keepPortable0(); + + return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj); + } + + /** + * Internal routine to get "keep-portable" projection. + * + * @return Projection with "keep-portable" flag. + */ + public <K1, V1> GridCacheProjectionImpl<K1, V1> keepPortable0() { + return new GridCacheProjectionImpl<>( (GridCacheProjection<K1, V1>)this, (GridCacheContext<K1, V1>)ctx, null, @@ -407,8 +418,6 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im null, null, ctx.portableEnabled()); - - return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index 841f961..ff34474 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -1063,7 +1063,7 @@ public class GridCacheContext<K, V> implements Externalizable { * * @param prj Flags to set. */ - void projectionPerCall(@Nullable GridCacheProjectionImpl<K, V> prj) { + public void projectionPerCall(@Nullable GridCacheProjectionImpl<K, V> prj) { if (nearContext()) dht().near().context().prjPerCall.set(prj); else http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java index d27ac8f..adfab5e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java @@ -406,7 +406,7 @@ public class GridCacheEntryImpl<K, V> implements GridCacheEntry<K, V>, Externali /** {@inheritDoc} */ @Nullable @Override public V get() throws IgniteCheckedException { - return proxy.get(key, isNearEnabled(ctx) ? null : cached, true); + return proxy.get(key, isNearEnabled(ctx) ? null : cached, !ctx.keepPortable()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java index 66defaa..e881aa9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java @@ -20,7 +20,6 @@ package org.gridgain.grid.kernal.processors.cache.query.continuous; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry; @@ -56,6 +55,9 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou /** Projection predicate */ private final IgnitePredicate<GridCacheEntry<K, V>> prjPred; + /** Keep portable flag. */ + private final boolean keepPortable; + /** Logger. */ private final IgniteLogger log; @@ -98,6 +100,8 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou this.topic = topic; this.prjPred = prjPred; + keepPortable = ctx.keepPortable(); + log = ctx.logger(getClass()); } @@ -279,9 +283,12 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou guard.block(); - GridContinuousHandler hnd = ctx.kernalContext().security().enabled() ? - new GridCacheContinuousQueryHandlerV2<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal, + GridContinuousHandler hnd = ctx.kernalContext().security().enabled() ? keepPortable ? + new GridCacheContinuousQueryHandlerV4<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal, ctx.kernalContext().job().currentTaskNameHash()) : + new GridCacheContinuousQueryHandlerV2<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal, + ctx.kernalContext().job().currentTaskNameHash()) : keepPortable ? + new GridCacheContinuousQueryHandlerV3<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal) : new GridCacheContinuousQueryHandler<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal); routineId = ctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, autoUnsubscribe, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java new file mode 100644 index 0000000..21266b7 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache.query.continuous; + + +import org.apache.ignite.lang.*; + +/** + * Extended continuous query filter. + */ +public interface GridCacheContinuousQueryFilterEx<K, V> extends + IgnitePredicate<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> { + /** + * Callback for query unregister event. + */ + public void onQueryUnregister(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java index 61f0098..a7ff429 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java @@ -77,6 +77,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** + * Constructor. + * * @param cacheName Cache name. * @param topic Topic for ordered messages. * @param cb Local callback. @@ -214,6 +216,12 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } + /** {@inheritDoc} */ + @Override public void onUnregister() { + if (filter != null && filter instanceof GridCacheContinuousQueryFilterEx) + ((GridCacheContinuousQueryFilterEx)filter).onQueryUnregister(); + } + private boolean checkProjection(GridCacheContinuousQueryEntry<K, V> e) { GridCacheProjectionImpl.FullFilter<K, V> filter = (GridCacheProjectionImpl.FullFilter<K, V>)prjPred; @@ -254,7 +262,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** {@inheritDoc} */ @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - manager(ctx).iterate(internal, routineId); + manager(ctx).iterate(internal, routineId, keepPortable()); } /** {@inheritDoc} */ @@ -415,6 +423,13 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** + * @return Keep portable flag. + */ + protected boolean keepPortable() { + return false; + } + + /** * Deployable object. */ private static class DeployableObject implements Externalizable { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV3.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV3.java new file mode 100644 index 0000000..e008586 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV3.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache.query.continuous; + +import org.apache.ignite.lang.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Continuous query handler used when "keepPortable" flag is set. + */ +public class GridCacheContinuousQueryHandlerV3<K, V> extends GridCacheContinuousQueryHandler<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * For {@link Externalizable}. + */ + public GridCacheContinuousQueryHandlerV3() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param topic Topic for ordered messages. + * @param cb Local callback. + * @param filter Filter. + * @param prjPred Projection predicate. + * @param internal If {@code true} then query is notified about internal entries updates. + */ + public GridCacheContinuousQueryHandlerV3(@Nullable String cacheName, Object topic, + IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, V>>> cb, + @Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter, + @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal) { + super(cacheName, topic, cb, filter, prjPred, internal); + } + + /** {@inheritDoc} */ + @Override protected boolean keepPortable() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV4.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV4.java new file mode 100644 index 0000000..a183cce --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV4.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gridgain.grid.kernal.processors.cache.query.continuous; + +import org.apache.ignite.lang.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Continuous query handler used when "keepPortable" flag is set and security is enabled. + */ +public class GridCacheContinuousQueryHandlerV4<K, V> extends GridCacheContinuousQueryHandlerV2<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * For {@link Externalizable}. + */ + public GridCacheContinuousQueryHandlerV4() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param topic Topic for ordered messages. + * @param cb Local callback. + * @param filter Filter. + * @param prjPred Projection predicate. + * @param internal If {@code true} then query is notified about internal entries updates. + * @param taskHash Task hash. + */ + public GridCacheContinuousQueryHandlerV4(@Nullable String cacheName, Object topic, + IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, V>>> cb, + @Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter, + @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal, int taskHash) { + super(cacheName, topic, cb, filter, prjPred, internal, taskHash); + } + + /** {@inheritDoc} */ + @Override protected boolean keepPortable() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java index 461fc0e..a0cf134 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java @@ -33,4 +33,9 @@ interface GridCacheContinuousQueryListener<K, V> { * @param recordEvt Whether to record event. */ public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt); + + /** + * Listener unregistered callback. + */ + public void onUnregister(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java index ac5bce0..2d8e106 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java @@ -129,6 +129,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * @param internal Internal flag. * @return Whether listener was actually registered. */ + @SuppressWarnings("UnusedParameters") boolean registerListener(UUID nodeId, UUID lsnrId, GridCacheContinuousQueryListener<K, V> lsnr, boolean internal) { ListenerInfo<K, V> info = new ListenerInfo<>(lsnr); @@ -158,13 +159,21 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * @param id Listener ID. */ void unregisterListener(boolean internal, UUID id) { + ListenerInfo info; + if (internal) { - if (intLsnrs.remove(id) != null) + if ((info = intLsnrs.remove(id)) != null) { intLsnrCnt.decrementAndGet(); + + info.lsnr.onUnregister(); + } } else { - if (lsnrs.remove(id) != null) + if ((info = lsnrs.remove(id)) != null) { lsnrCnt.decrementAndGet(); + + info.lsnr.onUnregister(); + } } } @@ -173,27 +182,43 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt * * @param internal Internal flag. * @param id Listener ID. + * @param keepPortable Keep portable flag. */ - void iterate(boolean internal, UUID id) { + @SuppressWarnings("unchecked") + void iterate(boolean internal, UUID id, boolean keepPortable) { ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id); assert info != null; - Set<GridCacheEntry<K, V>> entries; + GridCacheProjectionImpl<K, V> oldPrj = null; - if (cctx.isReplicated()) - entries = internal ? cctx.cache().entrySetx() : - cctx.cache().entrySet(); - else - entries = internal ? cctx.cache().primaryEntrySetx() : - cctx.cache().primaryEntrySet(); + try { + if (keepPortable) { + oldPrj = cctx.projectionPerCall(); - for (GridCacheEntry<K, V> e : entries) { - info.onIterate(new GridCacheContinuousQueryEntry<>(cctx, e, e.getKey(), e.getValue(), null, null, null), - !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)); - } + cctx.projectionPerCall(cctx.cache().<K, V>keepPortable0()); + } + + Set<GridCacheEntry<K, V>> entries; - info.flushPending(); + if (cctx.isReplicated()) + entries = internal ? cctx.cache().entrySetx() : + cctx.cache().entrySet(); + else + entries = internal ? cctx.cache().primaryEntrySetx() : + cctx.cache().primaryEntrySet(); + + for (GridCacheEntry<K, V> e : entries) { + info.onIterate(new GridCacheContinuousQueryEntry<>(cctx, e, e.getKey(), e.getValue(), null, null, null), + !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)); + } + + info.flushPending(); + } + finally { + if (keepPortable) + cctx.projectionPerCall(oldPrj); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java index f2d76f5..e86826e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java @@ -24,9 +24,6 @@ import org.gridgain.grid.kernal.processors.*; * Interop processor adapter. */ public abstract class GridInteropProcessorAdapter extends GridProcessorAdapter implements GridInteropProcessor { - /** Managed environment pointer. */ - public static final ThreadLocal<Long> ENV_PTR = new ThreadLocal<>(); - /** {@inheritDoc} */ protected GridInteropProcessorAdapter(GridKernalContext ctx) { super(ctx);