http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b54d939,67bc5b9..1298e3d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@@ -324,14 -325,73 +326,72 @@@ public class IgniteCacheProxy<K, V> ext } /** + * @param local Enforce local. + * @return Local node cluster group. + */ + private ClusterGroup projection(boolean local) { + return local || ctx.isLocal() || ctx.isReplicated() ? ctx.kernalContext().grid().forLocal() : null; + } + ++ /** + * Executes continuous query. + * + * @param qry Query. + * @param loc Local flag. + * @return Initial iteration cursor. + */ + private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, boolean loc) { + if (qry.getInitialPredicate() instanceof ContinuousQuery) + throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + + "continuous query. Use SCAN or SQL query for initial iteration."); + + if (qry.getLocalListener() == null) + throw new IgniteException("Mandatory local listener is not set for the query: " + qry); + + try { + final UUID routineId = ctx.continuousQueries().executeQuery( + qry.getLocalListener(), + qry.getRemoteFilter(), + qry.getBufferSize(), + qry.getTimeInterval(), + qry.isAutoUnsubscribe(), + loc ? ctx.grid().cluster().forLocal() : null); + + final QueryCursor<Cache.Entry<K, V>> cur; + + if (qry.getInitialPredicate() != null) + cur = loc ? localQuery(qry.getInitialPredicate()) : query(qry.getInitialPredicate()); + else + cur = null; + + return new QueryCursor<Cache.Entry<K, V>>() { + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return cur != null ? cur.iterator() : new GridEmptyIterator<Cache.Entry<K, V>>(); + } + + @Override public List<Cache.Entry<K, V>> getAll() { + return cur != null ? cur.getAll() : Collections.<Cache.Entry<K, V>>emptyList(); + } + + @Override public void close() { + if (cur != null) + cur.close(); + + try { + ctx.kernalContext().continuous().stopRoutine(routineId).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + }; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + - /** - * @param loc Enforce local. - * @return Local node cluster group. - */ - private ClusterGroup projection(boolean loc) { - return loc ? ctx.kernalContext().grid().cluster().forLocal() : null; - } - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") @Override public QueryCursor<Entry<K,V>> query(Query qry) { A.notNull(qry, "qry");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 4e1ceba,14025f4..c584b82 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@@ -18,6 -18,7 +18,8 @@@ package org.apache.ignite.internal.processors.cache.query; import org.apache.ignite.cache.query.*; ++import org.apache.ignite.cache.query.*; + import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesEx.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesEx.java index c1503fc,8d3c2be..738e6cf --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesEx.java @@@ -18,7 -18,7 +18,8 @@@ package org.apache.ignite.internal.processors.cache.query; import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; + import org.apache.ignite.internal.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 0000000,82b28cb..2aeb47b mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@@ -1,0 -1,502 +1,505 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.query.continuous; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.*; ++import org.apache.ignite.internal.processors.cache.*; ++import org.apache.ignite.internal.processors.cache.query.*; ++import org.apache.ignite.lang.*; + import org.apache.ignite.internal.managers.deployment.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.processors.continuous.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import javax.cache.event.*; + import javax.cache.event.EventType; + import java.io.*; + import java.util.*; + + import static org.apache.ignite.events.EventType.*; + + /** + * Continuous query handler. + */ + class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private String cacheName; + + /** Topic for ordered messages. */ + private Object topic; + + /** Local listener. */ + private transient CacheEntryUpdatedListener<K, V> locLsnr; + + /** Remote filter. */ + private CacheEntryEventFilter<K, V> rmtFilter; + + /** Deployable object for filter. */ + private DeployableObject rmtFilterDep; + + /** Internal flag. */ + private boolean internal; + + /** Notify existing flag. */ + private boolean notifyExisting; + + /** Old value required flag. */ + private boolean oldValRequired; + + /** Synchronous flag. */ + private boolean sync; + + /** Ignore expired events flag. */ + private boolean ignoreExpired; + + /** Task name hash code. */ + private int taskHash; + + /** Whether to skip primary check for REPLICATED cache. */ + private transient boolean skipPrimaryCheck; + + /** + * Required by {@link Externalizable}. + */ + public CacheContinuousQueryHandler() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheName Cache name. + * @param topic Topic for ordered messages. + * @param locLsnr Local listener. + * @param rmtFilter Remote filter. + * @param internal Internal flag. + * @param notifyExisting Notify existing flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired events flag. + * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. + * @param taskHash Task name hash code. + */ + public CacheContinuousQueryHandler( + String cacheName, + Object topic, + CacheEntryUpdatedListener<K, V> locLsnr, + CacheEntryEventFilter<K, V> rmtFilter, + boolean internal, + boolean notifyExisting, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + int taskHash, + boolean skipPrimaryCheck) { + assert topic != null; + assert locLsnr != null; + + this.cacheName = cacheName; + this.topic = topic; + this.locLsnr = locLsnr; + this.rmtFilter = rmtFilter; + this.internal = internal; + this.notifyExisting = notifyExisting; + this.oldValRequired = oldValRequired; + this.sync = sync; + this.ignoreExpired = ignoreExpired; + this.taskHash = taskHash; + this.skipPrimaryCheck = skipPrimaryCheck; + } + + /** {@inheritDoc} */ + @Override public boolean isForEvents() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isForMessaging() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isForQuery() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) + throws IgniteCheckedException { + assert nodeId != null; + assert routineId != null; + assert ctx != null; + + if (locLsnr != null) + ctx.resource().injectGeneric(locLsnr); + + if (rmtFilter != null) + ctx.resource().injectGeneric(rmtFilter); + + final boolean loc = nodeId.equals(ctx.localNodeId()); + + CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { + @Override public void onExecution() { + if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + ctx.event().record(new CacheQueryExecutedEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.CONTINUOUS, + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName() + )); + } + } + + @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, + boolean recordIgniteEvt) { + if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) + return; + + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx.isReplicated() && !skipPrimaryCheck && !primary) + return; + + boolean notify = true; + + if (rmtFilter != null) { + CacheFlag[] f = cctx.forceLocalRead(); + + try { + notify = rmtFilter.evaluate(evt); + } + finally { + cctx.forceFlags(f); + } + } + + if (notify) { + if (loc) + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + else { + try { + ClusterNode node = ctx.discovery().node(nodeId); + + if (ctx.config().isPeerClassLoadingEnabled() && node != null && + U.hasCache(node, cacheName)) { + evt.entry().p2pMarshal(ctx.config().getMarshaller()); + + evt.entry().cacheName(cacheName); + + GridCacheDeploymentManager depMgr = + ctx.cache().internalCache(cacheName).context().deploy(); + + depMgr.prepare(evt.entry()); + } + + ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + } + + if (recordIgniteEvt) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS, + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } + } + } + + @Override public void onUnregister() { + if (rmtFilter instanceof CacheContinuousQueryFilterEx) + ((CacheContinuousQueryFilterEx)rmtFilter).onQueryUnregister(); + } + + @Override public boolean oldValueRequired() { + return oldValRequired; + } + + @Override public boolean notifyExisting() { + return notifyExisting; + } + + private String taskName() { + return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; + } + }; + + return manager(ctx).registerListener(routineId, lsnr, internal); + } + + /** {@inheritDoc} */ + @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void unregister(UUID routineId, GridKernalContext ctx) { + assert routineId != null; + assert ctx != null; + + manager(ctx).unregisterListener(internal, routineId); + } + + /** + * @param ctx Kernal context. + * @return Continuous query manager. + */ + private CacheContinuousQueryManager<K, V> manager(GridKernalContext ctx) { + return cacheContext(ctx).continuousQueries(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { + assert nodeId != null; + assert routineId != null; + assert objs != null; + assert ctx != null; + + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = + (Collection<CacheEntryEvent<? extends K, ? extends V>>)objs; + + if (ctx.config().isPeerClassLoadingEnabled()) { + for (CacheEntryEvent<? extends K, ? extends V> evt : evts) { + assert evt instanceof CacheContinuousQueryEvent; + + CacheContinuousQueryEntry<? extends K, ? extends V> e = ((CacheContinuousQueryEvent)evt).entry(); + + GridCacheAdapter cache = ctx.cache().internalCache(e.cacheName()); + + ClassLoader ldr = null; + + if (cache != null) { + GridCacheDeploymentManager depMgr = cache.context().deploy(); + + GridDeploymentInfo depInfo = e.deployInfo(); + + if (depInfo != null) { + depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), + depInfo.participants(), depInfo.localDeploymentOwner()); + } + + ldr = depMgr.globalLoader(); + } + else { + U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " + + "when peer class loading is enabled: " + e.cacheName() + ". Will try to unmarshal " + + "with default class loader."); + } + + try { + e.p2pUnmarshal(ctx.config().getMarshaller(), ldr); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); + } + } + } + + locLsnr.onUpdated(evts); + } + + /** {@inheritDoc} */ + @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { + assert ctx != null; + assert ctx.config().isPeerClassLoadingEnabled(); + + if (rmtFilter != null && !U.isGrid(rmtFilter.getClass())) + rmtFilterDep = new DeployableObject(rmtFilter, ctx); + } + + /** {@inheritDoc} */ + @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + assert nodeId != null; + assert ctx != null; + assert ctx.config().isPeerClassLoadingEnabled(); + + if (rmtFilterDep != null) + rmtFilter = rmtFilterDep.unmarshal(nodeId, ctx); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object orderedTopic() { + return topic; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + out.writeObject(topic); + + boolean b = rmtFilterDep != null; + + out.writeBoolean(b); + + if (b) + out.writeObject(rmtFilterDep); + else + out.writeObject(rmtFilter); + + out.writeBoolean(internal); + out.writeBoolean(notifyExisting); + out.writeBoolean(oldValRequired); + out.writeBoolean(sync); + out.writeBoolean(ignoreExpired); + out.writeInt(taskHash); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + topic = in.readObject(); + + boolean b = in.readBoolean(); + + if (b) + rmtFilterDep = (DeployableObject)in.readObject(); + else + rmtFilter = (CacheEntryEventFilter<K, V>)in.readObject(); + + internal = in.readBoolean(); + notifyExisting = in.readBoolean(); + oldValRequired = in.readBoolean(); + sync = in.readBoolean(); + ignoreExpired = in.readBoolean(); + taskHash = in.readInt(); + } + + /** + * @param ctx Kernal context. + * @return Cache context. + */ + private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) { + assert ctx != null; + + return ctx.cache().<K, V>internalCache(cacheName).context(); + } + + /** + * Deployable object. + */ + private static class DeployableObject implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Serialized object. */ + private byte[] bytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** + * Required by {@link Externalizable}. + */ + public DeployableObject() { + // No-op. + } + + /** + * @param obj Object. + * @param ctx Kernal context. + * @throws IgniteCheckedException In case of error. + */ + private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { + assert obj != null; + assert ctx != null; + + Class cls = U.detectClass(obj); + + clsName = cls.getName(); + + GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj); + + depInfo = new GridDeploymentInfoBean(dep); + + bytes = ctx.config().getMarshaller().marshal(obj); + } + + /** + * @param nodeId Node ID. + * @param ctx Kernal context. + * @return Deserialized object. + * @throws IgniteCheckedException In case of error. + */ + <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + assert ctx != null; + + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, + depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); + + return ctx.config().getMarshaller().unmarshal(bytes, dep.classLoader()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeByteArray(out, bytes); + U.writeString(out, clsName); + out.writeObject(depInfo); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + bytes = U.readByteArray(in); + clsName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 80f53c6,f503161..1e91397 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@@ -100,40 -107,17 +100,40 @@@ public class GridQueryProcessor extend idx.start(ctx); - for (CacheConfiguration ccfg : ctx.config().getCacheConfiguration()){ + for (CacheConfiguration<?, ?> ccfg : ctx.config().getCacheConfiguration()){ CacheQueryConfiguration qryCfg = ccfg.getQueryConfiguration(); - if (qryCfg != null) { - if (!F.isEmpty(ccfg.getTypeMetadata())) { - for (CacheTypeMetadata meta : ccfg.getTypeMetadata()) - declaredTypesByName.put(new TypeName(ccfg.getName(), meta.getValueType()), meta); + if (qryCfg != null && qryCfg.getTypeResolver() != null) + typeResolvers.put(ccfg.getName(), qryCfg.getTypeResolver()); + + Map<TypeName,CacheTypeMetadata> declaredTypes = new HashMap<>(); + + if (!F.isEmpty(ccfg.getTypeMetadata())) { + for (CacheTypeMetadata meta : ccfg.getTypeMetadata()) { + declaredTypes.put(new TypeName(ccfg.getName(), meta.getValueType()), meta); + + int valTypeId = ctx.portable().typeId(meta.getValueType()); + + portableIds.put(valTypeId, meta.getValueType()); + + TypeDescriptor desc = processPortableMeta(meta); + + desc.registered(idx.registerType(ccfg.getName(), desc)); + + typesByName.put(new TypeName(ccfg.getName(), desc.name()), desc); + types.put(new TypeId(ccfg.getName(), valTypeId), desc); } + } - if (qryCfg.getTypeResolver() != null) - typeResolvers.put(ccfg.getName(), qryCfg.getTypeResolver()); + if (qryCfg != null && !F.isEmpty(qryCfg.getAnnotatedEntryTypes())) { + for (IgniteBiTuple<Class<?>,Class<?>> types : qryCfg.getAnnotatedEntryTypes()) { + TypeDescriptor desc = processKeyAndValue(ccfg.getName(), types.getKey(), types.getValue(), + declaredTypes); + + desc.registered(idx.registerType(ccfg.getName(), desc)); + + typesByName.put(new TypeName(ccfg.getName(), desc.name()), desc); + } } } @@@ -842,10 -897,10 +842,10 @@@ if (sqlAnn != null || txtAnn != null) { if (mtd.getParameterTypes().length != 0) - throw new IgniteCheckedException("Getter with CacheQuerySqlField " + + throw new IgniteCheckedException("Getter with QuerySqlField " + "annotation cannot have parameters: " + mtd); - ClassProperty prop = new ClassProperty(mtd); + ClassProperty prop = new ClassProperty(mtd, key); prop.parent(parent); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java index de64d09,6c21b28..c65477d --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java @@@ -26,9 -26,9 +26,10 @@@ import org.apache.ignite.cluster.* import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; + import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java index 8d601b1,4846183..05695c4 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java @@@ -19,8 -19,7 +19,9 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; + import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryEmbeddedValue.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryEmbeddedValue.java index 7ecb950,b62bb7c..3784593 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryEmbeddedValue.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryEmbeddedValue.java @@@ -17,8 -17,7 +17,9 @@@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; + import org.apache.ignite.cache.query.annotations.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryIndexingDisabledSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryIndexingDisabledSelfTest.java index e54f464,ec68252..e0ec73d --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryIndexingDisabledSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryIndexingDisabledSelfTest.java @@@ -19,8 -19,8 +19,9 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.testframework.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestValue2.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestValue2.java index 5fb90a6,c1b1acb..1cf0172 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestValue2.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestValue2.java @@@ -17,8 -17,7 +17,9 @@@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; + import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFieldsQueryNoDataSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFieldsQueryNoDataSelfTest.java index 40e78bf,0000000..7253aa4 mode 100644,000000..100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFieldsQueryNoDataSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFieldsQueryNoDataSelfTest.java @@@ -1,81 -1,0 +1,83 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; ++import org.apache.ignite.internal.*; ++import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Test for local query on partitioned cache without data. + */ +public class IgniteCacheFieldsQueryNoDataSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cache = defaultCacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + } + + /** + * @throws Exception If failed. + */ + public void testQuery() throws Exception { + Collection<Cache.Entry<Object, Object>> res = grid().jcache(null) + .query(new SqlQuery("select _VAL from Integer")).getAll(); + + assert res != null; + assert res.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java index 6c7924c,0000000..15e9576 mode 100644,000000..100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java @@@ -1,128 -1,0 +1,131 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; ++import org.apache.ignite.cache.query.annotations.*; ++import org.apache.ignite.internal.processors.cache.query.*; ++import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import javax.cache.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Tests for cache query index. + */ +public class IgniteCacheQueryIndexSelfTest extends GridCacheAbstractSelfTest { + /** Grid count. */ + private static final int GRID_CNT = 2; + + /** Entry count. */ + private static final int ENTRY_CNT = 10; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** + * @throws Exception If failed. + */ + public void testWithoutStoreLoad() throws Exception { + IgniteCache<Integer, CacheValue> cache = grid(0).jcache(null); + + for (int i = 0; i < ENTRY_CNT; i++) + cache.put(i, new CacheValue(i)); + + checkCache(cache); + checkQuery(cache); + } + + /** + * @throws Exception If failed. + */ + public void testWithStoreLoad() throws Exception { + for (int i = 0; i < ENTRY_CNT; i++) + putToStore(i, new CacheValue(i)); + + IgniteCache<Integer, CacheValue> cache0 = grid(0).jcache(null); + - cache0.loadCache(null, 0); ++ GridCache<Integer, CacheValue> cache0 = ((IgniteKernal)grid(0)).cache(null); + + checkCache(cache0); + checkQuery(cache0); + } + + /** + * @param cache Cache. + * @throws Exception If failed. + */ + private void checkCache(IgniteCache<Integer, CacheValue> cache) throws Exception { + Map<Integer, CacheValue> map = new HashMap<>(); + + for (Cache.Entry<Integer, CacheValue> entry : cache) + map.put(entry.getKey(), entry.getValue()); + + assert map.entrySet().size() == ENTRY_CNT : "Expected: " + ENTRY_CNT + ", but was: " + cache.size(); + assert map.keySet().size() == ENTRY_CNT : "Expected: " + ENTRY_CNT + ", but was: " + cache.size(); + assert map.values().size() == ENTRY_CNT : "Expected: " + ENTRY_CNT + ", but was: " + cache.size(); + assert cache.localSize() == ENTRY_CNT : "Expected: " + ENTRY_CNT + ", but was: " + cache.localSize(); + } + + /** + * @param cache Cache. + * @throws Exception If failed. + */ + private void checkQuery(IgniteCache<Integer, CacheValue> cache) throws Exception { + QueryCursor<Cache.Entry<Integer, CacheValue>> qry = + cache.query(Query.sql(CacheValue.class, "val >= 5")); + + Collection<Cache.Entry<Integer, CacheValue>> queried = qry.getAll(); + + assertEquals("Unexpected query result: " + queried, 5, queried.size()); + } + + /** + * Test cache value. + */ + private static class CacheValue { + @QuerySqlField + private final int val; + + CacheValue(int val) { + this.val = val; + } + + int value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java index 0d06bb4,e3adab1..b4220e5 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java @@@ -23,12 -23,9 +23,13 @@@ import org.apache.ignite.cache.affinity import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; + import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index db326e6,a69ce36..966d818 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@@ -55,8 -54,8 +54,9 @@@ import static org.apache.ignite.cache.C import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CachePreloadMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*; import static org.apache.ignite.events.EventType.*; + import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*; /** * Continuous queries tests. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java index 1da1957,0c48643..d69bd31 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java @@@ -21,7 -21,9 +21,10 @@@ import org.apache.ignite.cache.* import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java index ae1adde,b7bc84e..c191be7 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java @@@ -19,6 -19,7 +19,8 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.cache.*; import org.apache.ignite.internal.processors.cache.query.*; ++import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.*; import org.apache.ignite.lang.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index faeed4c,e3da0b7..917f9b3 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@@ -19,8 -19,7 +19,9 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; + import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java index 7a3d472,0b775e9..99b29ec --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java @@@ -21,8 -21,9 +21,11 @@@ import org.apache.ignite.* import org.apache.ignite.cache.*; import org.apache.ignite.cache.eviction.fifo.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractQuerySelfTest.*; +import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.GridCacheAbstractQuerySelfTest.*; + import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePartitionedQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePartitionedQueryMultiThreadedSelfTest.java index 899b481,0000000..e379536 mode 100644,000000..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePartitionedQueryMultiThreadedSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePartitionedQueryMultiThreadedSelfTest.java @@@ -1,306 -1,0 +1,308 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; ++import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; ++import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.query.Query.*; + +/** + * Tests for partitioned cache queries. + */ +public class IgniteCachePartitionedQueryMultiThreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static final boolean TEST_INFO = true; + + /** Number of test grids (nodes). Should not be less than 2. */ + private static final int GRID_CNT = 3; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Don't start grid by default. */ + public IgniteCachePartitionedQueryMultiThreadedSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + + // Query should be executed without ongoing transactions. + cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cc.setBackups(0); + cc.setPreloadMode(CachePreloadMode.SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + c.setCacheConfiguration(cc); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + assert GRID_CNT >= 2 : "Constant GRID_CNT must be greater than or equal to 2."; + + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + // Clean up all caches. + for (int i = 0; i < GRID_CNT; i++) + grid(i).jcache(null).removeAll(); + } + + /** {@inheritDoc} */ + @Override protected void info(String msg) { + if (TEST_INFO) + super.info(msg); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testLuceneAndSqlMultithreaded() throws Exception { + // ---------- Test parameters ---------- // + int luceneThreads = 10; + int sqlThreads = 10; + long duration = 10 * 1000; + final int logMod = 100; + + final Person p1 = new Person("Jon", 1500, "Master"); + final Person p2 = new Person("Jane", 2000, "Master"); + final Person p3 = new Person("Mike", 1800, "Bachelor"); + final Person p4 = new Person("Bob", 1900, "Bachelor"); + + final IgniteCache<UUID, Person> cache0 = grid(0).jcache(null); + + cache0.put(p1.id(), p1); + cache0.put(p2.id(), p2); + cache0.put(p3.id(), p3); + cache0.put(p4.id(), p4); + + assertEquals(4, cache0.localSize()); + - assert grid(0).nodes().size() == GRID_CNT; ++ assert grid(0).cluster().nodes().size() == GRID_CNT; + + final AtomicBoolean done = new AtomicBoolean(); + + final AtomicLong luceneCnt = new AtomicLong(); + + // Start lucene query threads. + IgniteInternalFuture<?> futLucene = GridTestUtils.runMultiThreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (!done.get()) { + QueryCursor<Cache.Entry<UUID, Person>> master = + cache0.query(text(Person.class, "Master")); + + Collection<Cache.Entry<UUID, Person>> entries = master.getAll(); + + checkResult(entries, p1, p2); + + long cnt = luceneCnt.incrementAndGet(); + + if (cnt % logMod == 0) + info("Executed LUCENE queries: " + cnt); + } + } + }, luceneThreads, "LUCENE-THREAD"); + + final AtomicLong sqlCnt = new AtomicLong(); + + // Start sql query threads. + IgniteInternalFuture<?> futSql = GridTestUtils.runMultiThreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (!done.get()) { + QueryCursor<Cache.Entry<UUID, Person>> bachelors = + cache0.query(sql(Person.class, "degree = 'Bachelor'")); + + Collection<Cache.Entry<UUID, Person>> entries = bachelors.getAll(); + + checkResult(entries, p3, p4); + + long cnt = sqlCnt.incrementAndGet(); + + if (cnt % logMod == 0) + info("Executed SQL queries: " + cnt); + } + } + }, sqlThreads, "SQL-THREAD"); + + Thread.sleep(duration); + + done.set(true); + + futLucene.get(); + futSql.get(); + } + + /** + * @param entries Queried result. + * @param persons Persons that should be in the result. + */ + private void checkResult(Iterable<Cache.Entry<UUID, Person>> entries, Person... persons) { + for (Cache.Entry<UUID, Person> entry : entries) { + assertEquals(entry.getKey(), entry.getValue().id()); + + assert F.asList(persons).contains(entry.getValue()); + } + } + + /** Test class. */ + private static class Person implements Externalizable { + /** */ + @GridToStringExclude + private UUID id = UUID.randomUUID(); + + /** */ + @QuerySqlField + private String name; + + /** */ + @QuerySqlField + private int salary; + + /** */ + @QuerySqlField + @QueryTextField + private String degree; + + /** Required by {@link Externalizable}. */ + public Person() { + // No-op. + } + + /** + * @param name Name. + * @param salary Salary. + * @param degree Degree. + */ + Person(String name, int salary, String degree) { + assert name != null; + assert salary > 0; + assert degree != null; + + this.name = name; + this.salary = salary; + this.degree = degree; + } + + /** @return Id. */ + UUID id() { + return id; + } + + /** @return Name. */ + String name() { + return name; + } + + /** @return Salary. */ + double salary() { + return salary; + } + + /** @return Degree. */ + String degree() { + return degree; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, id); + U.writeString(out, name); + out.writeInt(salary); + U.writeString(out, degree); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = U.readUuid(in); + name = U.readString(in); + salary = in.readInt(); + degree = U.readString(in); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id.hashCode() + 31 * name.hashCode() + 31 * 31 * salary; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (!(obj instanceof Person)) + return false; + + Person that = (Person)obj; + + return that.id.equals(id) && that.name.equals(name) && that.salary == salary && that.degree.equals(degree); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41bc7728/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java index 356547a,0000000..dd84744 mode 100644,000000..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java @@@ -1,325 -1,0 +1,324 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.cache.query.*; ++import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Test that entries are indexed on load/reload methods. + */ +public class IgniteCacheQueryLoadSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Puts count. */ + private static final int PUT_CNT = 10; + + /** Store map. */ + private static final Map<Integer, ValueObject> STORE_MAP = new HashMap<>(); + + /** */ + public IgniteCacheQueryLoadSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(REPLICATED); + ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setLoadPreviousValue(true); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + cache().removeAll(); + + assert cache().isEmpty(); + assert size(ValueObject.class) == 0; + + STORE_MAP.clear(); + } + + /** + * Number of objects of given type in index. + * + * @param cls Value type. + * @return Objects number. + * @throws IgniteCheckedException If failed. + */ + private long size(Class<?> cls) throws IgniteCheckedException { + GridCacheQueryManager<Object, Object> qryMgr = ((IgniteKernal)grid()).internalCache().context().queries(); + + assert qryMgr != null; + + return qryMgr.size(cls); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCache() throws Exception { + IgniteCache<Integer, ValueObject> cache = grid().jcache(null); + - cache.loadCache(null, 0); - + assert cache.size() == PUT_CNT; + + Collection<Cache.Entry<Integer, ValueObject>> res = + cache.query(new SqlQuery(ValueObject.class, "val >= 0")).getAll(); + + assertNotNull(res); + assertEquals(PUT_CNT, res.size()); + assertEquals(PUT_CNT, size(ValueObject.class)); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheAsync() throws Exception { + IgniteCache<Integer, ValueObject> cache = grid().jcache(null); + + IgniteCache<Integer, ValueObject> asyncCache = cache.withAsync(); + + asyncCache.loadCache(null, 0); + + asyncCache.future().get(); + + assert cache.size() == PUT_CNT; + + Collection<Cache.Entry<Integer, ValueObject>> res = + cache.query(new SqlQuery(ValueObject.class, "val >= 0")).getAll(); + + assert res != null; + assert res.size() == PUT_CNT; + assert size(ValueObject.class) == PUT_CNT; + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheFiltered() throws Exception { + IgniteCache<Integer, ValueObject> cache = grid().jcache(null); + + cache.loadCache(new P2<Integer, ValueObject>() { + @Override public boolean apply(Integer key, ValueObject val) { + return key >= 5; + } - }, 0); ++ }); + + assert cache.size() == PUT_CNT - 5; + + Collection<Cache.Entry<Integer, ValueObject>> res = + cache.query(new SqlQuery(ValueObject.class, "val >= 0")).getAll(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheAsyncFiltered() throws Exception { + IgniteCache<Integer, ValueObject> cache = grid().jcache(null); + + IgniteCache<Integer, ValueObject> asyncCache = cache.withAsync(); + + asyncCache.loadCache(new P2<Integer, ValueObject>() { + @Override + public boolean apply(Integer key, ValueObject val) { + return key >= 5; + } + }, 0); + + asyncCache.future().get(); + + assert cache.localSize() == PUT_CNT - 5; + + Collection<Cache.Entry<Integer, ValueObject>> res = + cache.query(new SqlQuery(ValueObject.class, "val >= 0")).getAll(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * @throws Exception If failed. + */ + public void testReloadAsync() throws Exception { + STORE_MAP.put(1, new ValueObject(1)); + + GridCache<Integer, ValueObject> cache = cache(); + + assert cache.reloadAsync(1).get().value() == 1; + + assert cache.size() == 1; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == 1; + assert size(ValueObject.class) == 1; + } + + /** + * @throws Exception If failed. + */ + public void testReloadAll() throws Exception { + for (int i = 0; i < PUT_CNT; i++) + STORE_MAP.put(i, new ValueObject(i)); + + GridCache<Integer, ValueObject> cache = cache(); + + Integer[] keys = new Integer[PUT_CNT - 5]; + + for (int i = 0; i < PUT_CNT - 5; i++) + keys[i] = i + 5; + + cache.reloadAll(F.asList(keys)); + + assert cache.size() == PUT_CNT - 5; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + + cache.clear(); + + assert cache.isEmpty(); + assertEquals(0, cache.size()); + + cache.reloadAll(Arrays.asList(keys)); + + assertEquals(PUT_CNT - 5, cache.size()); + + res = cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * Test store. + */ + private static class TestStore extends CacheStoreAdapter<Integer, ValueObject> { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, ValueObject> clo, @Nullable Object... args) { + assert clo != null; + + for (int i = 0; i < PUT_CNT; i++) + clo.apply(i, new ValueObject(i)); + } + + /** {@inheritDoc} */ + @Override public ValueObject load(Integer key) { + assert key != null; + + return STORE_MAP.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(javax.cache.Cache.Entry<? extends Integer, ? extends ValueObject> e) { + assert e != null; + assert e.getKey() != null; + assert e.getValue() != null; + + STORE_MAP.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + assert key != null; + + STORE_MAP.remove(key); + } + } + + /** + * Value object class. + */ + private static class ValueObject { + /** Value. */ + @QuerySqlField + private final int val; + + /** + * @param val Value. + */ + ValueObject(int val) { + this.val = val; + } + + /** + * @return Value. + */ + int value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ValueObject.class, this); + } + } +}