http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java new file mode 100644 index 0000000..4ad664a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.event.*; +import javax.cache.event.EventType; +import java.io.*; +import java.util.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Continuous query handler. + */ +class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private String cacheName; + + /** Topic for ordered messages. */ + private Object topic; + + /** Local listener. */ + private transient CacheEntryUpdatedListener<K, V> locLsnr; + + /** Remote filter. */ + private CacheEntryEventFilter<K, V> rmtFilter; + + /** Deployable object for filter. */ + private DeployableObject rmtFilterDep; + + /** Internal flag. */ + private boolean internal; + + /** Old value required flag. */ + private boolean oldValRequired; + + /** Synchronous flag. */ + private boolean sync; + + /** Ignore expired events flag. */ + private boolean ignoreExpired; + + /** Task name hash code. */ + private int taskHash; + + /** Whether to skip primary check for REPLICATED cache. */ + private transient boolean skipPrimaryCheck; + + /** + * Required by {@link Externalizable}. + */ + public CacheContinuousQueryHandler() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheName Cache name. + * @param topic Topic for ordered messages. + * @param locLsnr Local listener. + * @param rmtFilter Remote filter. + * @param internal Internal flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired events flag. + * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. + * @param taskHash Task name hash code. + */ + public CacheContinuousQueryHandler( + String cacheName, + Object topic, + CacheEntryUpdatedListener<K, V> locLsnr, + CacheEntryEventFilter<K, V> rmtFilter, + boolean internal, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + int taskHash, + boolean skipPrimaryCheck) { + assert topic != null; + assert locLsnr != null; + + this.cacheName = cacheName; + this.topic = topic; + this.locLsnr = locLsnr; + this.rmtFilter = rmtFilter; + this.internal = internal; + this.oldValRequired = oldValRequired; + this.sync = sync; + this.ignoreExpired = ignoreExpired; + this.taskHash = taskHash; + this.skipPrimaryCheck = skipPrimaryCheck; + } + + /** {@inheritDoc} */ + @Override public boolean isForEvents() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isForMessaging() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isForQuery() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) + throws IgniteCheckedException { + assert nodeId != null; + assert routineId != null; + assert ctx != null; + + if (locLsnr != null) + ctx.resource().injectGeneric(locLsnr); + + if (rmtFilter != null) + ctx.resource().injectGeneric(rmtFilter); + + final boolean loc = nodeId.equals(ctx.localNodeId()); + + CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { + @Override public void onExecution() { + if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + ctx.event().record(new CacheQueryExecutedEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.CONTINUOUS, + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName() + )); + } + } + + @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, + boolean recordIgniteEvt) { + if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) + return; + + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx.isReplicated() && !skipPrimaryCheck && !primary) + return; + + boolean notify = true; + + if (rmtFilter != null) { + CacheFlag[] f = cctx.forceLocalRead(); + + try { + notify = rmtFilter.evaluate(evt); + } + finally { + cctx.forceFlags(f); + } + } + + if (notify) { + if (loc) + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + else { + try { + ClusterNode node = ctx.discovery().node(nodeId); + + if (ctx.config().isPeerClassLoadingEnabled() && node != null && + U.hasCache(node, cacheName)) { + evt.entry().p2pMarshal(ctx.config().getMarshaller()); + + evt.entry().cacheName(cacheName); + + GridCacheDeploymentManager depMgr = + ctx.cache().internalCache(cacheName).context().deploy(); + + depMgr.prepare(evt.entry()); + } + + ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + } + + if (recordIgniteEvt) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS, + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } + } + } + + @Override public void onUnregister() { + if (rmtFilter instanceof CacheContinuousQueryFilterEx) + ((CacheContinuousQueryFilterEx)rmtFilter).onQueryUnregister(); + } + + @Override public boolean oldValueRequired() { + return oldValRequired; + } + + private String taskName() { + return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; + } + }; + + return manager(ctx).registerListener(routineId, lsnr, internal); + } + + /** {@inheritDoc} */ + @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void unregister(UUID routineId, GridKernalContext ctx) { + assert routineId != null; + assert ctx != null; + + manager(ctx).unregisterListener(internal, routineId); + } + + /** + * @param ctx Kernal context. + * @return Continuous query manager. + */ + private CacheContinuousQueryManager<K, V> manager(GridKernalContext ctx) { + return cacheContext(ctx).continuousQueries(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { + assert nodeId != null; + assert routineId != null; + assert objs != null; + assert ctx != null; + + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = + (Collection<CacheEntryEvent<? extends K, ? extends V>>)objs; + + if (ctx.config().isPeerClassLoadingEnabled()) { + for (CacheEntryEvent<? extends K, ? extends V> evt : evts) { + assert evt instanceof CacheContinuousQueryEvent; + + CacheContinuousQueryEntry<? extends K, ? extends V> e = ((CacheContinuousQueryEvent)evt).entry(); + + GridCacheAdapter cache = ctx.cache().internalCache(e.cacheName()); + + ClassLoader ldr = null; + + if (cache != null) { + GridCacheDeploymentManager depMgr = cache.context().deploy(); + + GridDeploymentInfo depInfo = e.deployInfo(); + + if (depInfo != null) { + depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), + depInfo.participants(), depInfo.localDeploymentOwner()); + } + + ldr = depMgr.globalLoader(); + } + else { + U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " + + "when peer class loading is enabled: " + e.cacheName() + ". Will try to unmarshal " + + "with default class loader."); + } + + try { + e.p2pUnmarshal(ctx.config().getMarshaller(), ldr); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); + } + } + } + + locLsnr.onUpdated(evts); + } + + /** {@inheritDoc} */ + @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { + assert ctx != null; + assert ctx.config().isPeerClassLoadingEnabled(); + + if (rmtFilter != null && !U.isGrid(rmtFilter.getClass())) + rmtFilterDep = new DeployableObject(rmtFilter, ctx); + } + + /** {@inheritDoc} */ + @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + assert nodeId != null; + assert ctx != null; + assert ctx.config().isPeerClassLoadingEnabled(); + + if (rmtFilterDep != null) + rmtFilter = rmtFilterDep.unmarshal(nodeId, ctx); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object orderedTopic() { + return topic; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + out.writeObject(topic); + + boolean b = rmtFilterDep != null; + + out.writeBoolean(b); + + if (b) + out.writeObject(rmtFilterDep); + else + out.writeObject(rmtFilter); + + out.writeBoolean(internal); + out.writeBoolean(oldValRequired); + out.writeBoolean(sync); + out.writeBoolean(ignoreExpired); + out.writeInt(taskHash); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + topic = in.readObject(); + + boolean b = in.readBoolean(); + + if (b) + rmtFilterDep = (DeployableObject)in.readObject(); + else + rmtFilter = (CacheEntryEventFilter<K, V>)in.readObject(); + + internal = in.readBoolean(); + oldValRequired = in.readBoolean(); + sync = in.readBoolean(); + ignoreExpired = in.readBoolean(); + taskHash = in.readInt(); + } + + /** + * @param ctx Kernal context. + * @return Cache context. + */ + private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) { + assert ctx != null; + + return ctx.cache().<K, V>internalCache(cacheName).context(); + } + + /** + * Deployable object. + */ + private static class DeployableObject implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Serialized object. */ + private byte[] bytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** + * Required by {@link Externalizable}. + */ + public DeployableObject() { + // No-op. + } + + /** + * @param obj Object. + * @param ctx Kernal context. + * @throws IgniteCheckedException In case of error. + */ + private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { + assert obj != null; + assert ctx != null; + + Class cls = U.detectClass(obj); + + clsName = cls.getName(); + + GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj); + + depInfo = new GridDeploymentInfoBean(dep); + + bytes = ctx.config().getMarshaller().marshal(obj); + } + + /** + * @param nodeId Node ID. + * @param ctx Kernal context. + * @return Deserialized object. + * @throws IgniteCheckedException In case of error. + */ + <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + assert ctx != null; + + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, + depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); + + return ctx.config().getMarshaller().unmarshal(bytes, dep.classLoader()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeByteArray(out, bytes); + U.writeString(out, clsName); + out.writeObject(depInfo); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + bytes = U.readByteArray(in); + clsName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java new file mode 100644 index 0000000..3695bad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +/** + * Continuous query listener. + */ +interface CacheContinuousQueryListener<K, V> { + /** + * Query execution callback. + */ + public void onExecution(); + + /** + * Entry update callback. + * + * @param evt Event + * @param primary Primary flag. + * @param recordIgniteEvt Whether to record event. + */ + public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt); + + /** + * Listener unregistered callback. + */ + public void onUnregister(); + + /** + * @return Whether old value is required. + */ + public boolean oldValueRequired(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java new file mode 100644 index 0000000..c2352c2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -0,0 +1,664 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.security.*; +import org.jdk8.backport.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.event.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static javax.cache.event.EventType.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.GridTopic.*; + +/** + * Continuous queries manager. + */ +public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> { + /** */ + private static final byte CREATED_FLAG = 0b0001; + + /** */ + private static final byte UPDATED_FLAG = 0b0010; + + /** */ + private static final byte REMOVED_FLAG = 0b0100; + + /** */ + private static final byte EXPIRED_FLAG = 0b1000; + + /** Listeners. */ + private final ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrs = new ConcurrentHashMap8<>(); + + /** Listeners count. */ + private final AtomicInteger lsnrCnt = new AtomicInteger(); + + /** Internal entries listeners. */ + private final ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> intLsnrs = new ConcurrentHashMap8<>(); + + /** Internal listeners count. */ + private final AtomicInteger intLsnrCnt = new AtomicInteger(); + + /** Query sequence number for message topic. */ + private final AtomicLong seq = new AtomicLong(); + + /** JCache listeners. */ + private final ConcurrentMap<CacheEntryListenerConfiguration, JCacheQuery> jCacheLsnrs = + new ConcurrentHashMap8<>(); + + /** Ordered topic prefix. */ + private String topicPrefix; + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + // Append cache name to the topic. + topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void onKernalStart0() throws IgniteCheckedException { + Iterable<CacheEntryListenerConfiguration<K, V>> cfgs = cctx.config().getCacheEntryListenerConfigurations(); + + if (cfgs != null) { + for (CacheEntryListenerConfiguration<K, V> cfg : cfgs) + executeJCacheQuery(cfg, true); + } + } + + /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + super.onKernalStop0(cancel); + + for (JCacheQuery lsnr : jCacheLsnrs.values()) { + try { + lsnr.cancel(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to stop JCache entry listener: " + e.getMessage()); + } + } + } + + /** + * @param e Cache entry. + * @param key Key. + * @param newVal New value. + * @param newBytes New value bytes. + * @param oldVal Old value. + * @param oldBytes Old value bytes. + * @throws IgniteCheckedException In case of error. + */ + public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, GridCacheValueBytes newBytes, + V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException { + assert e != null; + assert key != null; + + ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol; + + if (e.isInternal()) + lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null; + else + lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; + + if (F.isEmpty(lsnrCol)) + return; + + boolean hasNewVal = newVal != null || (newBytes != null && !newBytes.isNull()); + boolean hasOldVal = oldVal != null || (oldBytes != null && !oldBytes.isNull()); + + if (!hasNewVal && !hasOldVal) + return; + + EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; + + boolean initialized = false; + + boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1); + boolean recordIgniteEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + + for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { + if (!initialized) { + if (lsnr.oldValueRequired()) { + oldVal = cctx.unwrapTemporary(oldVal); + + if (oldVal == null && oldBytes != null && !oldBytes.isNull()) + oldVal = oldBytes.isPlain() ? (V)oldBytes.get() : cctx.marshaller().<V>unmarshal(oldBytes.get + (), cctx.deploy().globalLoader()); + } + + if (newVal == null && newBytes != null && !newBytes.isNull()) + newVal = newBytes.isPlain() ? (V)newBytes.get() : cctx.marshaller().<V>unmarshal(newBytes.get(), + cctx.deploy().globalLoader()); + } + + CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, newVal, newBytes, + lsnr.oldValueRequired() ? oldVal : null, lsnr.oldValueRequired() ? oldBytes : null); + + CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( + cctx.kernalContext().cache().jcache(cctx.name()), evtType, e0); + + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + } + } + + /** + * @param e Entry. + * @param key Key. + * @param oldVal Old value. + * @param oldBytes Old value bytes. + * @throws IgniteCheckedException In case of error. + */ + public void onEntryExpired(GridCacheEntryEx<K, V> e, K key, V oldVal, GridCacheValueBytes oldBytes) + throws IgniteCheckedException { + assert e != null; + assert key != null; + + if (e.isInternal()) + return; + + ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol = lsnrs; + + if (F.isEmpty(lsnrCol)) + return; + + if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) { + boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1); + boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + + boolean initialized = false; + + for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { + if (!initialized) { + if (lsnr.oldValueRequired()) { + oldVal = cctx.unwrapTemporary(oldVal); + + if (oldVal == null && oldBytes != null && !oldBytes.isNull()) + oldVal = oldBytes.isPlain() ? (V)oldBytes.get() : + cctx.marshaller().<V>unmarshal(oldBytes.get(), cctx.deploy().globalLoader()); + } + } + + CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, null, null, + lsnr.oldValueRequired() ? oldVal : null, lsnr.oldValueRequired() ? oldBytes : null); + + CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( + cctx.kernalContext().cache().jcache(cctx.name()), EXPIRED, e0); + + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + } + } + } + + /** + * @param locLsnr Local listener. + * @param rmtFilter Remote filter. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param grp Cluster group. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + public UUID executeQuery(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, + int bufSize, long timeInterval, boolean autoUnsubscribe, ClusterGroup grp) throws IgniteCheckedException { + return executeQuery0( + locLsnr, + rmtFilter, + bufSize, + timeInterval, + autoUnsubscribe, + false, + true, + false, + true, + grp); + } + + /** + * @param locLsnr Local listener. + * @param rmtFilter Remote filter. + * @param loc Local flag. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + public UUID executeInternalQuery(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, + boolean loc) throws IgniteCheckedException { + return executeQuery0( + locLsnr, + rmtFilter, + ContinuousQuery.DFLT_BUF_SIZE, + ContinuousQuery.DFLT_TIME_INTERVAL, + ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, + true, + true, + false, + true, + loc ? cctx.grid().forLocal() : null); + } + + public void cancelInternalQuery(UUID routineId) { + try { + cctx.kernalContext().continuous().stopRoutine(routineId).get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to stop internal continuous query: " + e.getMessage()); + } + } + + /** + * @param cfg Listener configuration. + * @param onStart Whether listener is created on node start. + * @throws IgniteCheckedException + */ + public void executeJCacheQuery(CacheEntryListenerConfiguration<K, V> cfg, boolean onStart) + throws IgniteCheckedException { + JCacheQuery lsnr = new JCacheQuery(cfg, onStart); + + JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr); + + if (old != null) + throw new IllegalArgumentException("Listener is already registered for configuration: " + cfg); + + try { + lsnr.execute(); + } + catch (IgniteCheckedException e) { + cancelJCacheQuery(cfg); + + throw e; + } + } + + /** + * @param cfg Listener configuration. + * @throws IgniteCheckedException In case of error. + */ + public void cancelJCacheQuery(CacheEntryListenerConfiguration<K, V> cfg) throws IgniteCheckedException { + JCacheQuery lsnr = jCacheLsnrs.remove(cfg); + + if (lsnr != null) + lsnr.cancel(); + } + + /** + * @param locLsnr Local listener. + * @param rmtFilter Remote filter. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param internal Internal flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired event flag. + * @param grp Cluster group. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, + int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean oldValRequired, + boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException { + cctx.checkSecurity(GridSecurityPermission.CACHE_READ); + + if (grp == null) + grp = cctx.kernalContext().grid(); + + Collection<ClusterNode> nodes = grp.nodes(); + + if (nodes.isEmpty()) + throw new ClusterTopologyException("Failed to execute continuous query (empty cluster group is " + + "provided)."); + + boolean skipPrimaryCheck = false; + + switch (cctx.config().getCacheMode()) { + case LOCAL: + if (!nodes.contains(cctx.localNode())) + throw new ClusterTopologyException("Continuous query for LOCAL cache can be executed " + + "only locally (provided projection contains remote nodes only)."); + else if (nodes.size() > 1) + U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " + + "ignored)."); + + grp = grp.forNode(cctx.localNode()); + + break; + + case REPLICATED: + if (nodes.size() == 1 && F.first(nodes).equals(cctx.localNode())) { + CacheDistributionMode distributionMode = cctx.config().getDistributionMode(); + + if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED) + skipPrimaryCheck = true; + } + + break; + } + + int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? + cctx.kernalContext().job().currentTaskNameHash() : 0; + + GridContinuousHandler hnd = new CacheContinuousQueryHandler<>( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilter, + internal, + oldValRequired, + sync, + ignoreExpired, + taskNameHash, + skipPrimaryCheck); + + return cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, + autoUnsubscribe, grp.predicate()).get(); + } + + /** + * @param lsnrId Listener ID. + * @param lsnr Listener. + * @param internal Internal flag. + * @return Whether listener was actually registered. + */ + boolean registerListener(UUID lsnrId, + CacheContinuousQueryListener<K, V> lsnr, + boolean internal) { + boolean added; + + if (internal) { + added = intLsnrs.putIfAbsent(lsnrId, lsnr) == null; + + if (added) + intLsnrCnt.incrementAndGet(); + } + else { + added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; + + if (added) { + lsnrCnt.incrementAndGet(); + + lsnr.onExecution(); + } + } + + return added; + } + + /** + * @param internal Internal flag. + * @param id Listener ID. + */ + void unregisterListener(boolean internal, UUID id) { + CacheContinuousQueryListener<K, V> lsnr; + + if (internal) { + if ((lsnr = intLsnrs.remove(id)) != null) { + intLsnrCnt.decrementAndGet(); + + lsnr.onUnregister(); + } + } + else { + if ((lsnr = lsnrs.remove(id)) != null) { + lsnrCnt.decrementAndGet(); + + lsnr.onUnregister(); + } + } + } + + /** + */ + private class JCacheQuery { + /** */ + private final CacheEntryListenerConfiguration<K, V> cfg; + + /** */ + private final boolean onStart; + + /** */ + private volatile UUID routineId; + + /** + * @param cfg Listener configuration. + */ + private JCacheQuery(CacheEntryListenerConfiguration<K, V> cfg, boolean onStart) { + this.cfg = cfg; + this.onStart = onStart; + } + + /** + * @throws IgniteCheckedException In case of error. + */ + @SuppressWarnings("unchecked") + void execute() throws IgniteCheckedException { + if (!onStart) + cctx.config().addCacheEntryListenerConfiguration(cfg); + + CacheEntryListener<? super K, ? super V> locLsnrImpl = cfg.getCacheEntryListenerFactory().create(); + + if (locLsnrImpl == null) + throw new IgniteCheckedException("Local CacheEntryListener is mandatory and can't be null."); + + byte types = 0; + + types |= locLsnrImpl instanceof CacheEntryCreatedListener ? CREATED_FLAG : 0; + types |= locLsnrImpl instanceof CacheEntryUpdatedListener ? UPDATED_FLAG : 0; + types |= locLsnrImpl instanceof CacheEntryRemovedListener ? REMOVED_FLAG : 0; + types |= locLsnrImpl instanceof CacheEntryExpiredListener ? EXPIRED_FLAG : 0; + + if (types == 0) + throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces."); + + CacheEntryUpdatedListener<K, V> locLsnr = (CacheEntryUpdatedListener<K, V>)new JCacheQueryLocalListener( + locLsnrImpl, cctx.kernalContext().cache().jcache(cctx.name())); + + CacheEntryEventFilter<K, V> rmtFilter = (CacheEntryEventFilter<K, V>)new JCacheQueryRemoteFilter<>( + cfg.getCacheEntryEventFilterFactory() != null ? cfg.getCacheEntryEventFilterFactory().create() : null, + types); + + routineId = executeQuery0( + locLsnr, + rmtFilter, + ContinuousQuery.DFLT_BUF_SIZE, + ContinuousQuery.DFLT_TIME_INTERVAL, + ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, + false, + cfg.isOldValueRequired(), + cfg.isSynchronous(), + false, + null); + } + + /** + * @throws IgniteCheckedException In case of error. + */ + @SuppressWarnings("unchecked") + void cancel() throws IgniteCheckedException { + UUID routineId0 = routineId; + + assert routineId0 != null; + + cctx.kernalContext().continuous().stopRoutine(routineId0).get(); + + cctx.config().removeCacheEntryListenerConfiguration(cfg); + } + } + + /** + */ + private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> { + /** */ + private final CacheEntryListener<K, V> impl; + + /** */ + private final Cache<K, V> cache; + + /** + * @param impl Listener. + */ + JCacheQueryLocalListener(CacheEntryListener<K, V> impl, Cache<K, V> cache) { + assert impl != null; + assert cache != null; + + this.impl = impl; + this.cache = cache; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + for (CacheEntryEvent<? extends K, ? extends V> evt : evts) { + switch (evt.getEventType()) { + case CREATED: + assert impl instanceof CacheEntryCreatedListener; + + ((CacheEntryCreatedListener<K, V>)impl).onCreated(singleton(evt)); + + break; + + case UPDATED: + assert impl instanceof CacheEntryUpdatedListener; + + ((CacheEntryUpdatedListener<K, V>)impl).onUpdated(singleton(evt)); + + break; + + case REMOVED: + assert impl instanceof CacheEntryRemovedListener; + + ((CacheEntryRemovedListener<K, V>)impl).onRemoved(singleton(evt)); + + break; + + case EXPIRED: + assert impl instanceof CacheEntryExpiredListener; + + ((CacheEntryExpiredListener<K, V>)impl).onExpired(singleton(evt)); + + break; + + default: + throw new IllegalStateException("Unknown type: " + evt.getEventType()); + } + } + } + + /** + * @param evt Event. + * @return Singleton iterable. + */ + @SuppressWarnings("unchecked") + private Iterable<CacheEntryEvent<? extends K, ? extends V>> singleton( + CacheEntryEvent<? extends K, ? extends V> evt) { + assert evt instanceof CacheContinuousQueryEvent; + + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(1); + + evts.add(new CacheContinuousQueryEvent<>(cache, evt.getEventType(), + ((CacheContinuousQueryEvent<? extends K, ? extends V>)evt).entry())); + + return evts; + } + } + + /** + */ + private static class JCacheQueryRemoteFilter<K, V> implements CacheEntryEventFilter<K, V>, Externalizable { + /** */ + private CacheEntryEventFilter<K, V> impl; + + /** */ + private byte types; + + /** + * For {@link Externalizable}. + */ + public JCacheQueryRemoteFilter() { + // no-op. + } + + /** + * @param impl Filter. + * @param types Types. + */ + JCacheQueryRemoteFilter(CacheEntryEventFilter<K, V> impl, byte types) { + assert types != 0; + + this.impl = impl; + this.types = types; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) { + return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt)); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(impl); + out.writeByte(types); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + impl = (CacheEntryEventFilter<K, V>)in.readObject(); + types = in.readByte(); + } + + /** + * @param evtType Type. + * @return Flag value. + */ + private byte flag(EventType evtType) { + switch (evtType) { + case CREATED: + return CREATED_FLAG; + + case UPDATED: + return UPDATED_FLAG; + + case REMOVED: + return REMOVED_FLAG; + + case EXPIRED: + return EXPIRED_FLAG; + + default: + throw new IllegalStateException("Unknown type: " + evtType); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java deleted file mode 100644 index 528bde6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.security.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.event.*; -import java.util.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.cache.CacheDistributionMode.*; - -/** - * Continuous query implementation. - */ -public class GridCacheContinuousQueryAdapter<K, V> implements CacheContinuousQuery<K, V> { - /** Guard. */ - private final GridBusyLock guard = new GridBusyLock(); - - /** Close lock. */ - private final Lock closeLock = new ReentrantLock(); - - /** Cache context. */ - private final GridCacheContext<K, V> ctx; - - /** Topic for ordered messages. */ - private final Object topic; - - /** Projection predicate */ - private final IgnitePredicate<Cache.Entry<K, V>> prjPred; - - /** Keep portable flag. */ - private final boolean keepPortable; - - /** Logger. */ - private final IgniteLogger log; - - /** Local callback. */ - private volatile IgniteBiPredicate<UUID, Collection<Map.Entry<K, V>>> cb; - - /** Local callback. */ - private volatile IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb; - - /** Filter. */ - private volatile IgniteBiPredicate<K, V> filter; - - /** Remote filter. */ - private volatile IgnitePredicate<CacheContinuousQueryEntry<K, V>> rmtFilter; - - /** Buffer size. */ - private volatile int bufSize = DFLT_BUF_SIZE; - - /** Time interval. */ - @SuppressWarnings("RedundantFieldInitialization") - private volatile long timeInterval = DFLT_TIME_INTERVAL; - - /** Automatic unsubscribe flag. */ - private volatile boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; - - /** Continuous routine ID. */ - private UUID routineId; - - /** - * @param ctx Cache context. - * @param topic Topic for ordered messages. - * @param prjPred Projection predicate. - */ - GridCacheContinuousQueryAdapter(GridCacheContext<K, V> ctx, Object topic, - @Nullable IgnitePredicate<Cache.Entry<K, V>> prjPred) { - assert ctx != null; - assert topic != null; - - this.ctx = ctx; - this.topic = topic; - this.prjPred = prjPred; - - keepPortable = ctx.keepPortable(); - - log = ctx.logger(getClass()); - } - - /** {@inheritDoc} */ - @Override public void localCallback(IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb) { - if (!guard.enterBusy()) - throw new IllegalStateException("Continuous query can't be changed after it was executed."); - - try { - this.locCb = locCb; - } - finally { - guard.leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> localCallback() { - return locCb; - } - - /** {@inheritDoc} */ - @Override public void remoteFilter(@Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> rmtFilter) { - if (!guard.enterBusy()) - throw new IllegalStateException("Continuous query can't be changed after it was executed."); - - try { - this.rmtFilter = rmtFilter; - } - finally { - guard.leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgnitePredicate<CacheContinuousQueryEntry<K, V>> remoteFilter() { - return rmtFilter; - } - - /** {@inheritDoc} */ - @Override public void bufferSize(int bufSize) { - A.ensure(bufSize > 0, "bufSize > 0"); - - if (!guard.enterBusy()) - throw new IllegalStateException("Continuous query can't be changed after it was executed."); - - try { - this.bufSize = bufSize; - } - finally { - guard.leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public int bufferSize() { - return bufSize; - } - - /** {@inheritDoc} */ - @Override public void timeInterval(long timeInterval) { - A.ensure(timeInterval >= 0, "timeInterval >= 0"); - - if (!guard.enterBusy()) - throw new IllegalStateException("Continuous query can't be changed after it was executed."); - - try { - this.timeInterval = timeInterval; - } - finally { - guard.leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public long timeInterval() { - return timeInterval; - } - - /** {@inheritDoc} */ - @Override public void autoUnsubscribe(boolean autoUnsubscribe) { - this.autoUnsubscribe = autoUnsubscribe; - } - - /** {@inheritDoc} */ - @Override public boolean isAutoUnsubscribe() { - return autoUnsubscribe; - } - - /** {@inheritDoc} */ - @Override public void execute() throws IgniteCheckedException { - execute(null, false, false, false, true); - } - - /** {@inheritDoc} */ - @Override public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException { - execute(prj, false, false, false, true); - } - - /** - * Starts continuous query execution. - * - * @param prj Grid projection. - * @param internal If {@code true} then query notified about internal entries updates. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. - * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. - * @param oldVal {@code True} if old value is required. - * @throws IgniteCheckedException If failed. - */ - public void execute(@Nullable ClusterGroup prj, - boolean internal, - boolean entryLsnr, - boolean sync, - boolean oldVal) throws IgniteCheckedException { - if (locCb == null) - throw new IllegalStateException("Mandatory local callback is not set for the query: " + this); - - ctx.checkSecurity(GridSecurityPermission.CACHE_READ); - - if (prj == null) - prj = ctx.grid(); - - prj = prj.forCacheNodes(ctx.name()); - - if (prj.nodes().isEmpty()) - throw new ClusterTopologyCheckedException("Failed to continuous execute query (projection is empty): " + - this); - - boolean skipPrimaryCheck = false; - - Collection<ClusterNode> nodes = prj.nodes(); - - if (nodes.isEmpty()) - throw new ClusterTopologyCheckedException("Failed to execute continuous query (empty projection is " + - "provided): " + this); - - switch (ctx.config().getCacheMode()) { - case LOCAL: - if (!nodes.contains(ctx.localNode())) - throw new ClusterTopologyCheckedException("Continuous query for LOCAL cache can be executed " + - "only locally (provided projection contains remote nodes only): " + this); - else if (nodes.size() > 1) - U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " + - "ignored): " + this); - - prj = prj.forNode(ctx.localNode()); - - break; - - case REPLICATED: - if (nodes.size() == 1 && F.first(nodes).equals(ctx.localNode())) { - CacheDistributionMode distributionMode = ctx.config().getDistributionMode(); - - if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED) - skipPrimaryCheck = true; - } - - break; - } - - closeLock.lock(); - - try { - if (routineId != null) - throw new IllegalStateException("Continuous query can't be executed twice."); - - guard.block(); - - int taskNameHash = - ctx.kernalContext().security().enabled() ? ctx.kernalContext().job().currentTaskNameHash() : 0; - - GridContinuousHandler hnd = new GridCacheContinuousQueryHandler<>(ctx.name(), - topic, - locCb, - rmtFilter, - prjPred, - internal, - entryLsnr, - sync, - oldVal, - skipPrimaryCheck, - taskNameHash, - keepPortable); - - routineId = ctx.kernalContext().continuous().startRoutine(hnd, - bufSize, - timeInterval, - autoUnsubscribe, - prj.predicate()).get(); - } - finally { - closeLock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - closeLock.lock(); - - try { - if (routineId != null) - ctx.kernalContext().continuous().stopRoutine(routineId).get(); - } - finally { - closeLock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheContinuousQueryAdapter.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java deleted file mode 100644 index fcfd3b8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.marshaller.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.event.*; -import java.io.*; - -import static org.apache.ignite.internal.processors.cache.CacheFlag.*; -import static org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*; - -/** - * Entry implementation. - */ -@SuppressWarnings("TypeParameterHidesVisibleType") -public class GridCacheContinuousQueryEntry<K, V> implements Cache.Entry<K, V>, GridCacheDeployable, Externalizable, - CacheContinuousQueryEntry<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Event type enum values. */ - private static final EventType[] EVT_TYPE_VALS = EventType.values(); - - /** Cache context. */ - @SuppressWarnings("TransientFieldNotInitialized") - @GridToStringExclude - private final transient GridCacheContext<K, V> ctx; - - /** Cache entry. */ - @SuppressWarnings("TransientFieldNotInitialized") - @GridToStringExclude - private final transient Cache.Entry<K, V> impl; - - /** Key. */ - @GridToStringInclude - private K key; - - /** New value. */ - @GridToStringInclude - private V newVal; - - /** Old value. */ - @GridToStringInclude - private V oldVal; - - /** Serialized key. */ - private byte[] keyBytes; - - /** Serialized value. */ - @GridToStringExclude - private GridCacheValueBytes newValBytes; - - /** Serialized value. */ - @GridToStringExclude - private GridCacheValueBytes oldValBytes; - - /** Cache name. */ - private String cacheName; - - /** Deployment info. */ - @GridToStringExclude - private GridDeploymentInfo depInfo; - - /** */ - private EventType evtType; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheContinuousQueryEntry() { - ctx = null; - impl = null; - } - - /** - * @param ctx Cache context. - * @param impl Cache entry. - * @param key Key. - * @param newVal Value. - * @param newValBytes Value bytes. - * @param oldVal Old value. - * @param oldValBytes Old value bytes. - * @param evtType Event type. - */ - GridCacheContinuousQueryEntry(GridCacheContext<K, V> ctx, - Cache.Entry<K, V> impl, - K key, - @Nullable V newVal, - @Nullable GridCacheValueBytes newValBytes, - @Nullable V oldVal, - @Nullable GridCacheValueBytes oldValBytes, - EventType evtType) { - assert ctx != null; - assert impl != null; - assert key != null; - assert evtType != null; - - this.ctx = ctx; - this.impl = impl; - this.key = key; - this.newVal = newVal; - this.newValBytes = newValBytes; - this.oldVal = oldVal; - this.oldValBytes = oldValBytes; - this.evtType = evtType; - } - - /** - * @return Cache entry. - */ - Cache.Entry<K, V> entry() { - return impl; - } - - /** - * @return Cache context. - */ - GridCacheContext<K, V> context() { - return ctx; - } - - /** - * @return New value bytes. - */ - GridCacheValueBytes newValueBytes() { - return newValBytes; - } - - /** - * @return {@code True} if old value is set. - */ - boolean hasOldValue() { - return oldVal != null || (oldValBytes != null && !oldValBytes.isNull()); - } - - /** - * @return {@code True} if entry expired. - */ - public EventType eventType() { - return evtType; - } - - /** - * Unmarshals value from bytes if needed. - * - * @param marsh Marshaller. - * @param ldr Class loader. - * @throws IgniteCheckedException In case of error. - */ - void initValue(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; - - if (newVal == null && newValBytes != null && !newValBytes.isNull()) - newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr); - } - - /** - * @return Cache name. - */ - String cacheName() { - return cacheName; - } - - /** - * @param cacheName New cache name. - */ - void cacheName(String cacheName) { - this.cacheName = cacheName; - } - - /** - * @param marsh Marshaller. - * @throws IgniteCheckedException In case of error. - */ - void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { - assert marsh != null; - - assert key != null; - - keyBytes = marsh.marshal(key); - - if (newValBytes == null || newValBytes.isNull()) - newValBytes = newVal != null ? - newVal instanceof byte[] ? plain(newVal) : marshaled(marsh.marshal(newVal)) : null; - - if (oldValBytes == null || oldValBytes.isNull()) - oldValBytes = oldVal != null ? - oldVal instanceof byte[] ? plain(oldVal) : marshaled(marsh.marshal(oldVal)) : null; - } - - /** - * @param marsh Marshaller. - * @param ldr Class loader. - * @throws IgniteCheckedException In case of error. - */ - void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; - - assert key == null : "Key should be null: " + key; - assert newVal == null : "New value should be null: " + newVal; - assert oldVal == null : "Old value should be null: " + oldVal; - assert keyBytes != null; - - key = marsh.unmarshal(keyBytes, ldr); - - if (newValBytes != null && !newValBytes.isNull()) - newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr); - - if (oldValBytes != null && !oldValBytes.isNull()) - oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : marsh.<V>unmarshal(oldValBytes.get(), ldr); - } - - /** {@inheritDoc} */ - @Override public K getKey() { - return key; - } - - /** {@inheritDoc} */ - @Override public V getValue() { - return newVal; - } - - /** {@inheritDoc} */ - @Override public V getOldValue() { - return oldVal; - } - - /** {@inheritDoc} */ - @Override public V setValue(V val) { - ctx.denyOnFlag(READ); - - return null; - } - - /** {@inheritDoc} */ - @Override public void prepare(GridDeploymentInfo depInfo) { - this.depInfo = depInfo; - } - - /** {@inheritDoc} */ - @Override public GridDeploymentInfo deployInfo() { - return depInfo; - } - - /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> clazz) { - if(clazz.isAssignableFrom(getClass())) - return clazz.cast(this); - - throw new IllegalArgumentException(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - boolean b = keyBytes != null; - - out.writeBoolean(b); - - if (b) { - U.writeByteArray(out, keyBytes); - - if (newValBytes != null && !newValBytes.isNull()) { - out.writeBoolean(true); - out.writeBoolean(newValBytes.isPlain()); - U.writeByteArray(out, newValBytes.get()); - } - else - out.writeBoolean(false); - - if (oldValBytes != null && !oldValBytes.isNull()) { - out.writeBoolean(true); - out.writeBoolean(oldValBytes.isPlain()); - U.writeByteArray(out, oldValBytes.get()); - } - else - out.writeBoolean(false); - - U.writeString(out, cacheName); - out.writeObject(depInfo); - } - else { - out.writeObject(key); - out.writeObject(newVal); - out.writeObject(oldVal); - } - - out.writeByte((byte)evtType.ordinal()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - boolean b = in.readBoolean(); - - if (b) { - keyBytes = U.readByteArray(in); - - if (in.readBoolean()) - newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); - - if (in.readBoolean()) - oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); - - cacheName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - else { - key = (K)in.readObject(); - newVal = (V)in.readObject(); - oldVal = (V)in.readObject(); - } - - evtType = EVT_TYPE_VALS[in.readByte()]; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheContinuousQueryEntry.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java deleted file mode 100644 index 7b0615d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - - -import org.apache.ignite.cache.query.*; -import org.apache.ignite.lang.*; - -/** - * Extended continuous query filter. - */ -public interface GridCacheContinuousQueryFilterEx<K, V> extends - IgnitePredicate<CacheContinuousQueryEntry<K, V>> { - /** - * Callback for query unregister event. - */ - public void onQueryUnregister(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java deleted file mode 100644 index d10c09f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java +++ /dev/null @@ -1,571 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.event.*; -import java.io.*; -import java.util.*; - -import static org.apache.ignite.events.EventType.*; - -/** - * Continuous query handler. - */ -class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { - /** */ - private static final long serialVersionUID = 0L; - - /** Cache name. */ - private String cacheName; - - /** Topic for ordered messages. */ - private Object topic; - - /** Local callback. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> cb; - - /** Filter. */ - private IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter; - - /** Projection predicate */ - private IgnitePredicate<Cache.Entry<K, V>> prjPred; - - /** Deployable object for filter. */ - private DeployableObject filterDep; - - /** Deployable object for Projection predicate. */ - private DeployableObject prjPredDep; - - /** Internal flag. */ - private boolean internal; - - /** Entry listener flag. */ - private boolean entryLsnr; - - /** Synchronous listener flag. */ - private boolean sync; - - /** {@code True} if old value is required. */ - private boolean oldVal; - - /** Task name hash code. */ - private int taskHash; - - /** Keep portable flag. */ - private boolean keepPortable; - - /** Whether to skip primary check for REPLICATED cache. */ - private transient boolean skipPrimaryCheck; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheContinuousQueryHandler() { - // No-op. - } - - /** - * Constructor. - * - * @param cacheName Cache name. - * @param topic Topic for ordered messages. - * @param cb Local callback. - * @param filter Filter. - * @param prjPred Projection predicate. - * @param internal If {@code true} then query is notified about internal entries updates. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. - * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. - * @param oldVal {@code True} if old value is required. - * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. - * @param taskHash Task name hash code. - */ - GridCacheContinuousQueryHandler(@Nullable String cacheName, - Object topic, - IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> cb, - @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter, - @Nullable IgnitePredicate<Cache.Entry<K, V>> prjPred, - boolean internal, - boolean entryLsnr, - boolean sync, - boolean oldVal, - boolean skipPrimaryCheck, - int taskHash, - boolean keepPortable) { - assert topic != null; - assert cb != null; - assert !sync || entryLsnr; - - this.cacheName = cacheName; - this.topic = topic; - this.cb = cb; - this.filter = filter; - this.prjPred = prjPred; - this.internal = internal; - this.entryLsnr = entryLsnr; - this.sync = sync; - this.oldVal = oldVal; - this.taskHash = taskHash; - this.keepPortable = keepPortable; - this.skipPrimaryCheck = skipPrimaryCheck; - } - - /** {@inheritDoc} */ - @Override public boolean isForEvents() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isForMessaging() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isForQuery() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) - throws IgniteCheckedException { - assert nodeId != null; - assert routineId != null; - assert ctx != null; - - if (cb != null) - ctx.resource().injectGeneric(cb); - - if (filter != null) - ctx.resource().injectGeneric(filter); - - final boolean loc = nodeId.equals(ctx.localNodeId()); - - GridCacheContinuousQueryListener<K, V> lsnr = new GridCacheContinuousQueryListener<K, V>() { - @Override public void onExecution() { - if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { - ctx.event().record(new CacheQueryExecutedEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.CONTINUOUS, - cacheName, - null, - null, - null, - filter, - null, - nodeId, - taskName() - )); - } - } - - @Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { - GridCacheContext<K, V> cctx = cacheContext(ctx); - - if (cctx.isReplicated() && - !skipPrimaryCheck && - !cctx.affinity().primary(cctx.localNode(), e.getKey(), cctx.topology().topologyVersion())) - return; - - boolean notify; - - CacheFlag[] f = cctx.forceLocalRead(); - - try { - notify = (prjPred == null || checkProjection(e)) && - (filter == null || filter.apply(e)); - } - finally { - cctx.forceFlags(f); - } - - if (notify) { - if (!oldVal && e.hasOldValue()) { - e = new GridCacheContinuousQueryEntry<>(e.context(), - e.entry(), - e.getKey(), - e.getValue(), - e.newValueBytes(), - null, - null, - e.eventType()); - } - - if (loc) { - if (!cb.apply(nodeId, - F.<CacheContinuousQueryEntry<K, V>>asList(e))) - ctx.continuous().stopRoutine(routineId); - } - else { - try { - ClusterNode node = ctx.discovery().node(nodeId); - - if (ctx.config().isPeerClassLoadingEnabled() && node != null && - U.hasCache(node, cacheName)) { - e.p2pMarshal(ctx.config().getMarshaller()); - - e.cacheName(cacheName); - - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); - - depMgr.prepare(e); - } - - ctx.continuous().addNotification(nodeId, routineId, e, topic, sync); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); - } - } - - if (!entryLsnr && recordEvt) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.CONTINUOUS, - cacheName, - null, - null, - null, - filter, - null, - nodeId, - taskName(), - e.getKey(), - e.getValue(), - e.getOldValue(), - null - )); - } - } - } - - /** {@inheritDoc} */ - @Override public void onUnregister() { - if (filter != null && filter instanceof GridCacheContinuousQueryFilterEx) - ((GridCacheContinuousQueryFilterEx)filter).onQueryUnregister(); - } - - private boolean checkProjection(GridCacheContinuousQueryEntry<K, V> e) { - GridCacheProjectionImpl.FullFilter<K, V> filter = (GridCacheProjectionImpl.FullFilter<K, V>)prjPred; - - GridCacheProjectionImpl.KeyValueFilter<K, V> kvFilter = filter.keyValueFilter(); - IgnitePredicate<? super Cache.Entry<K, V>> entryFilter = filter.entryFilter(); - - boolean ret = true; - - if (kvFilter != null) { - V v = e.getValue() == null ? e.getOldValue() : e.getValue(); - - ret = v != null && kvFilter.apply(e.getKey(), v); - } - - if (entryFilter != null) - ret = ret && entryFilter.apply(e); - - return ret; - } - - @Nullable private String taskName() { - return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; - } - }; - - return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr); - } - - /** {@inheritDoc} */ - @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - if (!entryLsnr) - manager(ctx).iterate(internal, routineId, keepPortable); - } - - /** {@inheritDoc} */ - @Override public void unregister(UUID routineId, GridKernalContext ctx) { - assert routineId != null; - assert ctx != null; - - manager(ctx).unregisterListener(internal, routineId); - } - - /** - * @param ctx Kernal context. - * @return Continuous query manager. - */ - private GridCacheContinuousQueryManager<K, V> manager(GridKernalContext ctx) { - return cacheContext(ctx).continuousQueries(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { - assert nodeId != null; - assert routineId != null; - assert objs != null; - assert ctx != null; - - Collection<CacheContinuousQueryEntry<K, V>> entries = - (Collection<CacheContinuousQueryEntry<K, V>>)objs; - - if (ctx.config().isPeerClassLoadingEnabled()) { - for (Map.Entry<K, V> e : entries) { - assert e instanceof GridCacheContinuousQueryEntry; - - GridCacheContinuousQueryEntry<K, V> qe = (GridCacheContinuousQueryEntry<K, V>)e; - - GridCacheAdapter cache = ctx.cache().internalCache(qe.cacheName()); - - ClassLoader ldr = null; - - if (cache != null) { - GridCacheDeploymentManager depMgr = cache.context().deploy(); - - GridDeploymentInfo depInfo = qe.deployInfo(); - - if (depInfo != null) { - depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), - depInfo.participants(), depInfo.localDeploymentOwner()); - } - - ldr = depMgr.globalLoader(); - } - else { - U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " + - "when peer class loading is enabled: " + qe.cacheName() + ". Will try to unmarshal " + - "with default class loader."); - } - - try { - qe.p2pUnmarshal(ctx.config().getMarshaller(), ldr); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); - } - } - } - - if (!cb.apply(nodeId, entries)) - ctx.continuous().stopRoutine(routineId); - } - - /** {@inheritDoc} */ - @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { - assert ctx != null; - assert ctx.config().isPeerClassLoadingEnabled(); - - if (filter != null && !U.isGrid(filter.getClass())) - filterDep = new DeployableObject(filter, ctx); - - if (prjPred != null && !U.isGrid(prjPred.getClass())) - prjPredDep = new DeployableObject(prjPred, ctx); - } - - /** {@inheritDoc} */ - @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { - assert nodeId != null; - assert ctx != null; - assert ctx.config().isPeerClassLoadingEnabled(); - - if (filterDep != null) - filter = filterDep.unmarshal(nodeId, ctx); - - if (prjPredDep != null) - prjPred = prjPredDep.unmarshal(nodeId, ctx); - } - - /** {@inheritDoc} */ - @Nullable @Override public Object orderedTopic() { - return topic; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeObject(topic); - - boolean b = filterDep != null; - - out.writeBoolean(b); - - if (b) - out.writeObject(filterDep); - else - out.writeObject(filter); - - b = prjPredDep != null; - - out.writeBoolean(b); - - if (b) - out.writeObject(prjPredDep); - else - out.writeObject(prjPred); - - out.writeBoolean(internal); - - out.writeBoolean(entryLsnr); - - out.writeBoolean(sync); - - out.writeBoolean(oldVal); - - out.writeInt(taskHash); - - out.writeBoolean(keepPortable); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - topic = in.readObject(); - - boolean b = in.readBoolean(); - - if (b) - filterDep = (DeployableObject)in.readObject(); - else - filter = (IgnitePredicate<CacheContinuousQueryEntry<K,V>>)in.readObject(); - - b = in.readBoolean(); - - if (b) - prjPredDep = (DeployableObject)in.readObject(); - else - prjPred = (IgnitePredicate<Cache.Entry<K, V>>)in.readObject(); - - internal = in.readBoolean(); - - entryLsnr = in.readBoolean(); - - sync = in.readBoolean(); - - oldVal = in.readBoolean(); - - taskHash = in.readInt(); - - keepPortable = in.readBoolean(); - } - - /** - * @param ctx Kernal context. - * @return Cache context. - */ - private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) { - assert ctx != null; - - return ctx.cache().<K, V>internalCache(cacheName).context(); - } - - /** - * Deployable object. - */ - private static class DeployableObject implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Serialized object. */ - private byte[] bytes; - - /** Deployment class name. */ - private String clsName; - - /** Deployment info. */ - private GridDeploymentInfo depInfo; - - /** - * Required by {@link Externalizable}. - */ - public DeployableObject() { - // No-op. - } - - /** - * @param obj Object. - * @param ctx Kernal context. - * @throws IgniteCheckedException In case of error. - */ - private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { - assert obj != null; - assert ctx != null; - - Class cls = U.detectClass(obj); - - clsName = cls.getName(); - - GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj); - - depInfo = new GridDeploymentInfoBean(dep); - - bytes = ctx.config().getMarshaller().marshal(obj); - } - - /** - * @param nodeId Node ID. - * @param ctx Kernal context. - * @return Deserialized object. - * @throws IgniteCheckedException In case of error. - */ - <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { - assert ctx != null; - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, - depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); - - return ctx.config().getMarshaller().unmarshal(bytes, dep.classLoader()); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeByteArray(out, bytes); - U.writeString(out, clsName); - out.writeObject(depInfo); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - bytes = U.readByteArray(in); - clsName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - } -}