http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java index d221c47..dbb2665 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java @@ -21,10 +21,10 @@ import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.managers.eventstorage.*; -import org.gridgain.grid.kernal.managers.indexing.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.datastructures.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; +import org.gridgain.grid.kernal.processors.query.*; import org.gridgain.grid.kernal.processors.task.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.future.*; @@ -36,7 +36,6 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import java.io.*; -import java.lang.reflect.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; @@ -52,10 +51,7 @@ import static org.gridgain.grid.kernal.processors.cache.query.GridCacheQueryType @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapter<K, V> { /** */ - protected GridIndexingManager idxMgr; - - /** Indexing SPI name. */ - private String spi; + protected GridQueryProcessor idxProc; /** */ private String space; @@ -82,8 +78,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** {@inheritDoc} */ @Override public void start0() throws GridException { - idxMgr = cctx.kernalContext().indexing(); - spi = cctx.config().getIndexingSpiName(); + idxProc = cctx.kernalContext().query(); space = cctx.name(); maxIterCnt = cctx.config().getMaximumQueryIteratorCount(); @@ -170,7 +165,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throw new IllegalStateException("Failed to get size (grid is stopping)."); try { - return idxMgr.size(spi, space, valType); + return idxProc.size(space, valType); } finally { leaveBusy(); @@ -198,7 +193,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throw new IllegalStateException("Failed to rebuild indexes (grid is stopping)."); try { - return idxMgr.rebuildIndexes(spi, space, typeName); + return idxProc.rebuildIndexes(space, typeName); } finally { leaveBusy(); @@ -215,7 +210,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throw new IllegalStateException("Failed to rebuild indexes (grid is stopping)."); try { - return idxMgr.rebuildAllIndexes(spi); + return idxProc.rebuildAllIndexes(); } finally { leaveBusy(); @@ -260,14 +255,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * * @param swapSpaceName Swap space name. * @param key Key. - * @throws org.apache.ignite.spi.IgniteSpiException If failed. + * @throws GridException If failed. */ - public void onSwap(String swapSpaceName, K key) throws IgniteSpiException { + public void onSwap(String swapSpaceName, K key) throws GridException { if (!enterBusy()) return; // Ignore index update when node is stopping. try { - idxMgr.onSwap(spi, space, swapSpaceName, key); + idxProc.onSwap(space, key); } finally { leaveBusy(); @@ -280,14 +275,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param key Key. * @param val Value * @param valBytes Value bytes. - * @throws org.apache.ignite.spi.IgniteSpiException If failed. + * @throws GridException If failed. */ - public void onUnswap(K key, V val, byte[] valBytes) throws IgniteSpiException { + public void onUnswap(K key, V val, byte[] valBytes) throws GridException { if (!enterBusy()) return; // Ignore index update when node is stopping. try { - idxMgr.onUnswap(spi, space, key, val, valBytes); + idxProc.onUnswap(space, key, val, valBytes); } finally { leaveBusy(); @@ -329,7 +324,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (val == null) val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); - idxMgr.store(spi, space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime); + idxProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime); } finally { invalidateResultCache(); @@ -341,21 +336,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param key Key. * @param keyBytes Byte array with key value. - * @return {@code true} if key was found and removed, otherwise {@code false}. * @throws GridException Thrown in case of any errors. */ @SuppressWarnings("SimplifiableIfStatement") - public boolean remove(K key, @Nullable byte[] keyBytes) throws GridException { + public void remove(K key, @Nullable byte[] keyBytes) throws GridException { assert key != null; if (!cctx.config().isQueryIndexEnabled() && !(key instanceof GridCacheInternal)) - return false; // No-op. + return; // No-op. if (!enterBusy()) - return false; // Ignore index update when node is stopping. + return; // Ignore index update when node is stopping. try { - return idxMgr.remove(spi, space, key, keyBytes); + idxProc.remove(space, key); } finally { invalidateResultCache(); @@ -374,7 +368,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - idxMgr.onUndeploy(space, ldr); + idxProc.onUndeploy(space, ldr); } catch (GridException e) { throw new GridRuntimeException(e); @@ -473,7 +467,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte else res = new QueryResult<>(qry.type(), recipient); - GridCloseableIterator<IndexingKeyValueRow<K, V>> iter; + GridCloseableIterator<IgniteBiTuple<K, V>> iter; try { switch (qry.type()) { @@ -494,8 +488,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte taskName)); } - iter = idxMgr.query(spi, space, qry.clause(), F.asList(args), - qry.queryClassName(), qry.includeBackups(), projectionFilter(qry)); + iter = idxProc.query(space, qry.clause(), F.asList(args), + qry.queryClassName(), filter(qry)); break; @@ -537,8 +531,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte taskName)); } - iter = idxMgr.queryText(spi, space, qry.clause(), qry.queryClassName(), - qry.includeBackups(), projectionFilter(qry)); + iter = idxProc.queryText(space, qry.clause(), qry.queryClassName(), filter(qry)); break; @@ -583,56 +576,92 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object recipient) throws GridException { assert qry != null; - if (qry.clause() == null) { - assert !loc; + FieldsResult res; - throw new GridException("Received next page request after iterator was removed. " + - "Consider increasing maximum number of stored iterators (see " + - "GridCacheConfiguration.getMaximumQueryIteratorCount() configuration property)."); - } + T2<String, List<Object>> resKey = null; - assert qry.type() == SQL_FIELDS; + if (qry.type() == SQL_FIELDS) { + if (qry.clause() == null) { + assert !loc; - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { - cctx.gridEvents().record(new IgniteCacheQueryExecutedEvent<>( - cctx.localNode(), - "SQL fields query executed.", - EVT_CACHE_QUERY_EXECUTED, - org.gridgain.grid.cache.query.GridCacheQueryType.SQL_FIELDS, - cctx.namex(), - null, - qry.clause(), - null, - null, - args, - subjId, - taskName)); - } + throw new GridException("Received next page request after iterator was removed. " + + "Consider increasing maximum number of stored iterators (see " + + "GridCacheConfiguration.getMaximumQueryIteratorCount() configuration property)."); + } - T2<String, List<Object>> resKey = new T2<>(qry.clause(), F.asList(args)); + if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + cctx.gridEvents().record(new IgniteCacheQueryExecutedEvent<>( + cctx.localNode(), + "SQL fields query executed.", + EVT_CACHE_QUERY_EXECUTED, + org.gridgain.grid.cache.query.GridCacheQueryType.SQL_FIELDS, + cctx.namex(), + null, + qry.clause(), + null, + null, + args, + subjId, + taskName)); + } - FieldsResult res = (FieldsResult)qryResCache.get(resKey); + // Attempt to get result from cache. + resKey = new T2<>(qry.clause(), F.asList(args)); - if (res != null && res.addRecipient(recipient)) - return res; + res = (FieldsResult)qryResCache.get(resKey); + + if (res != null && res.addRecipient(recipient)) + return res; // Cached result found. + + res = new FieldsResult(recipient); + + if (qryResCache.putIfAbsent(resKey, res) != null) + resKey = null; // Failed to cache result. + } + else { + assert qry.type() == SPI; - res = new FieldsResult(recipient); + if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + cctx.gridEvents().record(new IgniteCacheQueryExecutedEvent<>( + cctx.localNode(), + "SPI query executed.", + EVT_CACHE_QUERY_EXECUTED, + org.gridgain.grid.cache.query.GridCacheQueryType.SPI, + cctx.namex(), + null, + null, + null, + null, + args, + subjId, + taskName)); + } - boolean cached = qryResCache.putIfAbsent(resKey, res) == null; + res = new FieldsResult(recipient); + } try { - IndexingFieldsResult qryRes = idxMgr.queryFields(spi, space, qry.clause(), F.asList(args), - qry.includeBackups(), projectionFilter(qry)); + if (qry.type() == SPI) { + IgniteSpiCloseableIterator<?> iter = cctx.kernalContext().indexing().query(space, F.asList(args), + filter(qry)); + + res.onDone(iter); + } + else { + assert qry.type() == SQL_FIELDS; + + GridQueryFieldsResult qryRes = idxProc.queryFields(space, qry.clause(), F.asList(args), filter(qry)); - res.metaData(qryRes.metaData()); + res.metaData(qryRes.metaData()); - res.onDone(qryRes.iterator()); + res.onDone(qryRes.iterator()); + } } catch (Exception e) { res.onDone(e); } finally { - if (cached) + if (resKey != null) qryResCache.remove(resKey, res); } @@ -643,7 +672,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param qry Query. * @return Cache set items iterator. */ - private GridCloseableIterator<IndexingKeyValueRow<K, V>> setIterator(GridCacheQueryAdapter<?> qry) { + private GridCloseableIterator<IgniteBiTuple<K, V>> setIterator(GridCacheQueryAdapter<?> qry) { final GridSetQueryPredicate filter = (GridSetQueryPredicate)qry.scanFilter(); filter.init(cctx); @@ -655,11 +684,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (data == null) data = Collections.emptyList(); - final GridIterator<IndexingKeyValueRow<K, V>> it = F.iterator( + final GridIterator<IgniteBiTuple<K, V>> it = F.iterator( data, - new C1<GridCacheSetItemKey, IndexingKeyValueRow<K, V>>() { - @Override public IndexingKeyValueRow<K, V> apply(GridCacheSetItemKey e) { - return new IndexingKeyValueRowAdapter<>((K)e.item(), (V)Boolean.TRUE); + new C1<GridCacheSetItemKey, IgniteBiTuple<K, V>>() { + @Override public IgniteBiTuple<K, V> apply(GridCacheSetItemKey e) { + return new IgniteBiTuple<>((K)e.item(), (V)Boolean.TRUE); } }, true, @@ -669,12 +698,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } }); - return new GridCloseableIteratorAdapter<IndexingKeyValueRow<K, V>>() { + return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { @Override protected boolean onHasNext() { return it.hasNext(); } - @Override protected IndexingKeyValueRow<K, V> onNext() { + @Override protected IgniteBiTuple<K, V> onNext() { return it.next(); } @@ -694,7 +723,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @throws GridException If failed to get iterator. */ @SuppressWarnings({"unchecked"}) - private GridCloseableIterator<IndexingKeyValueRow<K, V>> scanIterator(final GridCacheQueryAdapter<?> qry) + private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator(final GridCacheQueryAdapter<?> qry) throws GridException { IgnitePredicate<GridCacheEntry<K, V>> filter = null; @@ -717,8 +746,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte injectResources(keyValFilter); - GridIterator<IndexingKeyValueRow<K, V>> heapIt = new GridIteratorAdapter<IndexingKeyValueRow<K, V>>() { - private IndexingKeyValueRow<K, V> next; + GridIterator<IgniteBiTuple<K, V>> heapIt = new GridIteratorAdapter<IgniteBiTuple<K, V>>() { + private IgniteBiTuple<K, V> next; private Iterator<K> iter = prj.keySet().iterator(); @@ -730,11 +759,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return next != null; } - @Override public IndexingKeyValueRow<K, V> nextX() { + @Override public IgniteBiTuple<K, V> nextX() { if (next == null) throw new NoSuchElementException(); - IndexingKeyValueRow<K, V> next0 = next; + IgniteBiTuple<K, V> next0 = next; advance(); @@ -766,7 +795,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } next = next0 != null ? - new IndexingKeyValueRowAdapter<>(next0.getKey(), next0.getValue()) : + new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : null; } @@ -781,10 +810,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } }; - final GridIterator<IndexingKeyValueRow<K, V>> it; + final GridIterator<IgniteBiTuple<K, V>> it; if (cctx.isSwapOrOffheapEnabled()) { - List<GridIterator<IndexingKeyValueRow<K, V>>> iters = new ArrayList<>(3); + List<GridIterator<IgniteBiTuple<K, V>>> iters = new ArrayList<>(3); iters.add(heapIt); @@ -799,12 +828,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte else it = heapIt; - return new GridCloseableIteratorAdapter<IndexingKeyValueRow<K, V>>() { + return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { @Override protected boolean onHasNext() { return it.hasNext(); } - @Override protected IndexingKeyValueRow<K, V> onNext() { + @Override protected IgniteBiTuple<K, V> onNext() { return it.next(); } @@ -819,7 +848,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Swap iterator. * @throws GridException If failed. */ - private GridIterator<IndexingKeyValueRow<K, V>> swapIterator(GridCacheQueryAdapter<?> qry) + private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry) throws GridException { IgnitePredicate<GridCacheEntry<Object, Object>> prjPred = qry.projectionFilter(); @@ -834,7 +863,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param qry Query. * @return Offheap iterator. */ - private GridIterator<IndexingKeyValueRow<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry) { + private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry) { IgnitePredicate<GridCacheEntry<Object, Object>> prjPred = qry.projectionFilter(); IgniteBiPredicate<K, V> filter = qry.scanFilter(); @@ -858,7 +887,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param keepPortable Keep portable flag. * @return Iterator. */ - private GridIteratorAdapter<IndexingKeyValueRow<K, V>> scanIterator( + private GridIteratorAdapter<IgniteBiTuple<K, V>> scanIterator( @Nullable final Iterator<Map.Entry<byte[], byte[]>> it, @Nullable final IgnitePredicate<GridCacheEntry<Object, Object>> prjPred, @Nullable final IgniteBiPredicate<K, V> filter, @@ -866,8 +895,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (it == null) return new GridEmptyCloseableIterator<>(); - return new GridIteratorAdapter<IndexingKeyValueRow<K, V>>() { - private IndexingKeyValueRow<K, V> next; + return new GridIteratorAdapter<IgniteBiTuple<K, V>>() { + private IgniteBiTuple<K, V> next; { advance(); @@ -877,11 +906,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return next != null; } - @Override public IndexingKeyValueRow<K, V> nextX() { + @Override public IgniteBiTuple<K, V> nextX() { if (next == null) throw new NoSuchElementException(); - IndexingKeyValueRow<K, V> next0 = next; + IgniteBiTuple<K, V> next0 = next; advance(); @@ -913,7 +942,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte continue; } - next = new IndexingKeyValueRowAdapter<>(e.key(), e.value()); + next = new IgniteBiTuple<>(e.key(), e.value()); break; } @@ -962,8 +991,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte try { // Preparing query closures. IgnitePredicate<GridCacheEntry<Object, Object>> prjFilter = qryInfo.projectionPredicate(); - IgniteClosure<List<?>, Object> trans = (IgniteClosure<List<?>, Object>)qryInfo.transformer(); - IgniteReducer<List<?>, Object> rdc = (IgniteReducer<List<?>, Object>)qryInfo.reducer(); + IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)qryInfo.transformer(); + IgniteReducer<Object, Object> rdc = (IgniteReducer<Object, Object>)qryInfo.reducer(); injectResources(prjFilter); injectResources(trans); @@ -974,7 +1003,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte int pageSize = qry.pageSize(); Collection<Object> data = null; - Collection<List<IndexingEntity<?>>> entities = null; + Collection<Object> entities = null; if (qryInfo.local() || rdc != null || cctx.isLocalNode(qryInfo.senderId())) data = new ArrayList<>(pageSize); @@ -989,14 +1018,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte fieldsQueryResult(qryInfo, taskName); // If metadata needs to be returned to user and cleaned from internal fields - copy it. - List<IndexingFieldMetadata> meta = qryInfo.includeMetaData() ? - (res.metaData() != null ? new ArrayList<>(res.metaData()) : null) : + List<GridQueryFieldMetadata> meta = qryInfo.includeMetaData() ? + (res.metaData() != null ? new ArrayList<GridQueryFieldMetadata>(res.metaData()) : null) : res.metaData(); if (!qryInfo.includeMetaData()) meta = null; - GridCloseableIterator<List<IndexingEntity<?>>> it = new GridSpiCloseableIteratorWrapper<>( + GridCloseableIterator<?> it = new GridSpiCloseableIteratorWrapper<Object>( res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId()))); if (log.isDebugEnabled()) @@ -1015,7 +1044,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte boolean metaSent = false; while (!Thread.currentThread().isInterrupted() && it.hasNext()) { - List<IndexingEntity<?>> row = it.next(); + Object row = it.next(); // Query is cancelled. if (row == null) { @@ -1041,26 +1070,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, null, null, - F.viewListReadOnly(row, new CX1<IndexingEntity<?>, Object>() { - @Override public Object applyx(IndexingEntity<?> ent) throws GridException { - return ent.value(); - } - }))); + row)); } if ((qryInfo.local() || rdc != null || cctx.isLocalNode(qryInfo.senderId()))) { - List<Object> fields = new ArrayList<>(row.size()); - - for (IndexingEntity<?> ent : row) - fields.add(ent.value()); - // Reduce. if (rdc != null) { - if (!rdc.collect(fields)) + if (!rdc.collect(row)) break; } else - data.add(fields); + data.add(row); } else entities.add(row); @@ -1153,7 +1173,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); - IgniteSpiCloseableIterator<IndexingKeyValueRow<K, V>> iter; + IgniteSpiCloseableIterator<IgniteBiTuple<K, V>> iter; GridCacheQueryType type; QueryResult<K, V> res; @@ -1182,7 +1202,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte long topVer = cctx.affinity().affinityTopologyVersion(); while (!Thread.currentThread().isInterrupted() && iter.hasNext()) { - IndexingKeyValueRow<K, V> row = iter.next(); + IgniteBiTuple<K, V> row = iter.next(); // Query is cancelled. if (row == null) { @@ -1191,7 +1211,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte break; } - K key = row.key().value(); + K key = row.getKey(); // Filter backups for SCAN queries. Other types are filtered in indexing manager. if (!cctx.isReplicated() && cctx.config().getCacheMode() != LOCAL && qry.type() == SCAN && @@ -1204,11 +1224,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte continue; } - IndexingEntity<V> v = row.value(); - - assert v != null && v.hasValue(); - - V val = v.value(); + V val = row.getValue(); if (log.isDebugEnabled()) log.debug("Record [key=" + key + ", val=" + val + ", incBackups=" + @@ -1589,6 +1605,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } if (fut != null) { + assert fut.isDone(); + try { fut.get().closeIfNotShared(recipient(sndId, reqId)); } @@ -1623,34 +1641,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return {@code true} if page was processed right. */ protected abstract boolean onFieldsPageReady(boolean loc, GridCacheQueryInfo qryInfo, - @Nullable List<IndexingFieldMetadata> metaData, - @Nullable Collection<List<IndexingEntity<?>>> entities, + @Nullable List<GridQueryFieldMetadata> metaData, + @Nullable Collection<?> entities, @Nullable Collection<?> data, boolean finished, @Nullable Throwable e); /** - * Checks if a given query class is a Java primitive or wrapper - * and throws {@link IllegalStateException} if there is configured {@code GridH2IndexingSpi} - * with disabled {@code GridH2IndexingSpi#isDefaultIndexPrimitiveKey()}. - * - * @param cls Query class. May be {@code null}. - * @throws IllegalStateException If checking failed. - */ - private void checkPrimitiveIndexEnabled(@Nullable Class<?> cls) { - if (cls == null) - return; - - if (GridUtils.isPrimitiveOrWrapper(cls)) { - for (IndexingSpi indexingSpi : cctx.gridConfig().getIndexingSpi()) { - if (!isDefaultIndexPrimitiveKey(indexingSpi)) - throw new IllegalStateException("Invalid use of primitive class type in queries when " + - "GridH2IndexingSpi.isDefaultIndexPrimitiveKey() is disabled " + - "(consider enabling indexing for primitive types)."); - } - } - } - - /** * Gets cache queries metrics. * * @return Cache queries metrics. @@ -1685,7 +1681,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throw new IllegalStateException("Failed to get metadata (grid is stopping)."); try { - Callable<Collection<CacheSqlMetadata>> job = new MetadataJob(spi); + Callable<Collection<CacheSqlMetadata>> job = new MetadataJob(); // Remote nodes that have current cache. Collection<ClusterNode> nodes = F.view(cctx.discovery().remoteNodes(), new P1<ClusterNode>() { @@ -1747,7 +1743,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Filter. */ @SuppressWarnings("unchecked") - @Nullable private IndexingQueryFilter projectionFilter(GridCacheQueryAdapter<?> qry) { + @Nullable private GridIndexingQueryFilter projectionFilter(GridCacheQueryAdapter<?> qry) { assert qry != null; final IgnitePredicate<GridCacheEntry<Object, Object>> prjFilter = qry.projectionFilter(); @@ -1755,8 +1751,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (prjFilter == null || F.isAlwaysTrue(prjFilter)) return null; - return new IndexingQueryFilter() { - @Nullable @Override public IgniteBiPredicate<K, V> forSpace(String spaceName) throws GridException { + return new GridIndexingQueryFilter() { + @Nullable @Override public IgniteBiPredicate<K, V> forSpace(String spaceName) { if (!F.eq(space, spaceName)) return null; @@ -1777,23 +1773,73 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** - * @param indexingSpi Indexing SPI. - * @return {@code True} if given SPI is GridH2IndexingSpi with enabled property {@code isDefaultIndexPrimitiveKey}. + * @param <K> Key type. + * @param <V> Value type. + * @return Predicate. + * @param includeBackups Include backups. */ - private static boolean isDefaultIndexPrimitiveKey(IndexingSpi indexingSpi) { - if (indexingSpi.getClass().getName().equals(GridComponentType.H2_INDEXING.className())) { - try { - Method method = indexingSpi.getClass().getMethod("isDefaultIndexPrimitiveKey"); + @SuppressWarnings("unchecked") + @Nullable public <K, V> GridIndexingQueryFilter backupsFilter(boolean includeBackups) { + if (includeBackups) + return null; - return (Boolean)method.invoke(indexingSpi); - } - catch (Exception e) { - throw new GridRuntimeException("Failed to invoke 'isDefaultIndexPrimitiveKey' method " + - "on GridH2IndexingSpi.", e); + return new GridIndexingQueryFilter() { + @Nullable @Override public IgniteBiPredicate<K, V> forSpace(final String spaceName) { + final GridKernalContext ctx = cctx.kernalContext(); + + final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName); + + if (cache.context().isReplicated() || cache.configuration().getBackups() == 0) + return null; + + return new IgniteBiPredicate<K, V>() { + @Override public boolean apply(K k, V v) { + return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1); + } + }; } - } + }; + } - return false; + /** + * @param qry Query. + * @return Filter. + */ + private GridIndexingQueryFilter filter(GridCacheQueryAdapter<?> qry) { + return and(backupsFilter(qry.includeBackups()), projectionFilter(qry)); + } + + /** + * @param f1 First filter. + * @param f2 Second filter. + * @return And filter of the given two. + */ + @Nullable private static GridIndexingQueryFilter and(@Nullable final GridIndexingQueryFilter f1, + @Nullable final GridIndexingQueryFilter f2) { + if (f1 == null) + return f2; + + if (f2 == null) + return f1; + + return new GridIndexingQueryFilter() { + @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) { + final IgniteBiPredicate<K, V> fltr1 = f1.forSpace(spaceName); + final IgniteBiPredicate<K, V> fltr2 = f2.forSpace(spaceName); + + if (fltr1 == null) + return fltr2; + + if (fltr2 == null) + return fltr1; + + return new IgniteBiPredicate<K, V>() { + @Override public boolean apply(K k, V v) { + return fltr1.apply(k, v) && fltr2.apply(k, v); + } + }; + } + }; } /** @@ -1825,16 +1871,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @IgniteInstanceResource private Ignite ignite; - /** Indexing SPI name. */ - private final String spiName; - - /** - * @param spiName Indexing SPI name. - */ - private MetadataJob(@Nullable String spiName) { - this.spiName = spiName; - } - /** {@inheritDoc} */ @Override public Collection<CacheSqlMetadata> call() { final GridKernalContext ctx = ((GridKernal) ignite).context(); @@ -1847,15 +1883,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte }, new P1<GridCache<?, ?>>() { @Override public boolean apply(GridCache<?, ?> c) { - return !CU.UTILITY_CACHE_NAME.equals(c.name()) && - F.eq(spiName, c.configuration().getIndexingSpiName()); + return !CU.UTILITY_CACHE_NAME.equals(c.name()); } } ); return F.transform(cacheNames, new C1<String, CacheSqlMetadata>() { @Override public CacheSqlMetadata apply(String cacheName) { - Collection<IndexingTypeDescriptor> types = ctx.indexing().types(cacheName); + Collection<GridQueryTypeDescriptor> types = ctx.query().types(cacheName); Collection<String> names = U.newHashSet(types.size()); Map<String, String> keyClasses = U.newHashMap(types.size()); @@ -1863,7 +1898,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Map<String, Map<String, String>> fields = U.newHashMap(types.size()); Map<String, Collection<GridCacheSqlIndexMetadata>> indexes = U.newHashMap(types.size()); - for (IndexingTypeDescriptor type : types) { + for (GridQueryTypeDescriptor type : types) { // Filter internal types (e.g., data structures). if (type.name().startsWith("GridCache")) continue; @@ -1892,11 +1927,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Collection<GridCacheSqlIndexMetadata> indexesCol = new ArrayList<>(type.indexes().size()); - for (Map.Entry<String, IndexDescriptor> e : type.indexes().entrySet()) { - IndexDescriptor desc = e.getValue(); + for (Map.Entry<String, GridQueryIndexDescriptor> e : type.indexes().entrySet()) { + GridQueryIndexDescriptor desc = e.getValue(); // Add only SQL indexes. - if (desc.type() == IndexType.SORTED) { + if (desc.type() == GridQueryIndexType.SORTED) { Collection<String> idxFields = e.getValue().fields(); Collection<String> descendings = new LinkedList<>(); @@ -2142,8 +2177,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** + * */ - private static class QueryResult<K, V> extends CachedResult<IndexingKeyValueRow<K, V>> { + private static class QueryResult<K, V> extends CachedResult<IgniteBiTuple<K, V>> { /** */ private static final long serialVersionUID = 0L; @@ -2171,12 +2207,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * */ - private static class FieldsResult extends CachedResult<List<IndexingEntity<?>>> { + private static class FieldsResult<Q> extends CachedResult<Q> { /** */ private static final long serialVersionUID = 0L; /** */ - private List<IndexingFieldMetadata> meta; + private List<GridQueryFieldMetadata> meta; /** * @param recipient ID of the recipient. @@ -2188,7 +2224,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @return Metadata. */ - public List<IndexingFieldMetadata> metaData() throws GridException { + public List<GridQueryFieldMetadata> metaData() throws GridException { get(); // Ensure that result is ready. return meta; @@ -2197,7 +2233,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param meta Metadata. */ - public void metaData(List<IndexingFieldMetadata> meta) { + public void metaData(List<GridQueryFieldMetadata> meta) { this.meta = meta; } } @@ -2387,7 +2423,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * */ private class OffheapIteratorClosure - extends CX2<T2<Long, Integer>, T2<Long, Integer>, IndexingKeyValueRow<K, V>> { + extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> { /** */ private static final long serialVersionUID = 7410163202728985912L; @@ -2417,7 +2453,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Nullable @Override public IndexingKeyValueRow<K, V> applyx(T2<Long, Integer> keyPtr, + @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, Integer> keyPtr, T2<Long, Integer> valPtr) throws GridException { LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr); @@ -2437,7 +2473,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return null; } - return new IndexingKeyValueRowAdapter<>(e.key(), (V)cctx.unwrapTemporary(e.value())) ; + return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())) ; } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryMetadataAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryMetadataAware.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryMetadataAware.java index 2507369..66060e0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryMetadataAware.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryMetadataAware.java @@ -9,8 +9,8 @@ package org.gridgain.grid.kernal.processors.cache.query; +import org.gridgain.grid.kernal.processors.query.*; import org.apache.ignite.lang.*; -import org.apache.ignite.spi.indexing.*; import java.util.*; @@ -21,5 +21,5 @@ public interface GridCacheQueryMetadataAware { /** * @return Future to retrieve metadata. */ - public IgniteFuture<List<IndexingFieldMetadata>> metadata(); + public IgniteFuture<List<GridQueryFieldMetadata>> metadata(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryRequest.java index d76b4f7..221eebb 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryRequest.java @@ -205,8 +205,8 @@ public class GridCacheQueryRequest<K, V> extends GridCacheMessage<K, V> implemen int taskHash ) { assert type != null || fields; - assert clause != null || (type == SCAN || type == SET); - assert clsName != null || fields || type == SCAN || type == SET; + assert clause != null || (type == SCAN || type == SET || type == SPI); + assert clsName != null || fields || type == SCAN || type == SET || type == SPI; this.cacheId = cacheId; this.id = id; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryResponse.java index 0564d2a..479f2e8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryResponse.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryResponse.java @@ -9,11 +9,10 @@ package org.gridgain.grid.kernal.processors.cache.query; -import org.apache.ignite.spi.*; -import org.apache.ignite.spi.indexing.*; import org.gridgain.grid.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.query.*; import org.gridgain.grid.util.direct.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -54,7 +53,7 @@ public class GridCacheQueryResponse<K, V> extends GridCacheMessage<K, V> impleme /** */ @GridToStringInclude @GridDirectTransient - private List<IndexingFieldMetadata> metadata; + private List<GridQueryFieldMetadata> metadata; /** */ @GridDirectCollection(byte[].class) @@ -105,7 +104,7 @@ public class GridCacheQueryResponse<K, V> extends GridCacheMessage<K, V> impleme errBytes = ctx.marshaller().marshal(err); metaDataBytes = marshalCollection(metadata, ctx); - dataBytes = fields ? marshalFieldsCollection(data, ctx) : marshalCollection(data, ctx); + dataBytes = marshalCollection(data, ctx); if (ctx.deploymentEnabled() && !F.isEmpty(data)) { for (Object o : data) { @@ -127,20 +126,20 @@ public class GridCacheQueryResponse<K, V> extends GridCacheMessage<K, V> impleme err = ctx.marshaller().unmarshal(errBytes, ldr); metadata = unmarshalCollection(metaDataBytes, ctx, ldr); - data = fields ? unmarshalFieldsCollection(dataBytes, ctx, ldr) : unmarshalCollection(dataBytes, ctx, ldr); + data = unmarshalCollection(dataBytes, ctx, ldr); } /** * @return Metadata. */ - public List<IndexingFieldMetadata> metadata() { + public List<GridQueryFieldMetadata> metadata() { return metadata; } /** * @param metadata Metadata. */ - public void metadata(@Nullable List<IndexingFieldMetadata> metadata) { + public void metadata(@Nullable List<GridQueryFieldMetadata> metadata) { this.metadata = metadata; } @@ -195,128 +194,6 @@ public class GridCacheQueryResponse<K, V> extends GridCacheMessage<K, V> impleme } /** {@inheritDoc} */ - @SuppressWarnings("TypeMayBeWeakened") - @Nullable private Collection<byte[]> marshalFieldsCollection(@Nullable Collection<Object> col, - GridCacheSharedContext<K, V> ctx) throws GridException { - assert ctx != null; - - if (col == null) - return null; - - Collection<List<Object>> col0 = new ArrayList<>(col.size()); - - for (Object o : col) { - List<IndexingEntity<?>> list = (List<IndexingEntity<?>>)o; - List<Object> list0 = new ArrayList<>(list.size()); - - for (IndexingEntity<?> ent : list) { - if (ent.bytes() != null) - list0.add(ent.bytes()); - else { - if (ctx.deploymentEnabled()) - prepareObject(ent.value(), ctx); - - list0.add(CU.marshal(ctx, ent.value())); - } - } - - col0.add(list0); - } - - return marshalCollection(col0, ctx); - } - - /** {@inheritDoc} */ - @SuppressWarnings("TypeMayBeWeakened") - @Nullable private Collection<Object> unmarshalFieldsCollection(@Nullable Collection<byte[]> byteCol, - GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws GridException { - assert ctx != null; - assert ldr != null; - - Collection<Object> col = unmarshalCollection(byteCol, ctx, ldr); - Collection<Object> col0 = null; - - if (col != null) { - col0 = new ArrayList<>(col.size()); - - for (Object o : col) { - List<Object> list = (List<Object>)o; - List<Object> list0 = new ArrayList<>(list.size()); - - for (Object obj : list) - list0.add(obj != null ? ctx.marshaller().unmarshal((byte[])obj, ldr) : null); - - col0.add(list0); - } - } - - return col0; - } - - /** - * @param out Object output. - * @throws IOException If failed. - */ - @SuppressWarnings("TypeMayBeWeakened") - private void writeFieldsCollection(ObjectOutput out) throws IOException { - assert fields; - - out.writeInt(data != null ? data.size() : -1); - - if (data == null) - return; - - for (Object o : data) { - List<IndexingEntity<?>> list = (List<IndexingEntity<?>>)o; - - out.writeInt(list.size()); - - for (IndexingEntity<?> idxEnt : list) { - try { - out.writeObject(idxEnt.value()); - } - catch (IgniteSpiException e) { - throw new IOException("Failed to write indexing entity: " + idxEnt, e); - } - } - } - } - - /** - * @param in Object input. - * @return Read collection. - * @throws IOException If failed. - * @throws ClassNotFoundException If failed. - */ - private Collection<Object> readFieldsCollection(ObjectInput in) throws IOException, ClassNotFoundException { - assert fields; - - int size = in.readInt(); - - if (size == -1) - return null; - - Collection<Object> res = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - int size0 = in.readInt(); - - Collection<Object> col = new ArrayList<>(size0); - - for (int j = 0; j < size0; j++) - col.add(in.readObject()); - - assert col.size() == size0; - - res.add(col); - } - - assert res.size() == size; - - return res; - } - - /** {@inheritDoc} */ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) @Override public GridTcpCommunicationMessageAdapter clone() { GridCacheQueryResponse _clone = new GridCacheQueryResponse(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryType.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryType.java index d57fcde..fbcd6e5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryType.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryType.java @@ -19,6 +19,11 @@ import org.jetbrains.annotations.*; */ public enum GridCacheQueryType { /** + * User provided indexing SPI based query. + */ + SPI, + + /** * Fully scans cache returning only entries that pass certain filters. */ SCAN, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlMetadata.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlMetadata.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlMetadata.java index cd47b0d..3ec5cb4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlMetadata.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlMetadata.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.cache.query; +import org.apache.ignite.spi.indexing.*; import org.jetbrains.annotations.*; import java.io.*; @@ -34,7 +35,7 @@ public interface GridCacheSqlMetadata extends Externalizable { * <p> * By default, type name is equal to simple class name * of stored object, but it can depend on implementation - * of {@link org.apache.ignite.spi.indexing.IndexingSpi}. + * of {@link GridIndexingSpi}. * * @return Collection of available types. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java index ccfadec..a080192 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java @@ -16,12 +16,12 @@ import org.apache.ignite.marshaller.*; import org.apache.ignite.marshaller.jdk.*; import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.resources.*; -import org.apache.ignite.spi.indexing.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.query.*; +import org.gridgain.grid.kernal.processors.query.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -199,7 +199,7 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { GridCacheQueryFuture<List<?>> fut = qry.execute(args.toArray()); - Collection<IndexingFieldMetadata> meta = ((GridCacheQueryMetadataAware)fut).metadata().get(); + Collection<GridQueryFieldMetadata> meta = ((GridCacheQueryMetadataAware)fut).metadata().get(); if (meta == null) { // Try to extract initial SQL exception. @@ -220,7 +220,7 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { cols = new ArrayList<>(meta.size()); types = new ArrayList<>(meta.size()); - for (IndexingFieldMetadata desc : meta) { + for (GridQueryFieldMetadata desc : meta) { tbls.add(desc.typeName()); cols.add(desc.fieldName().toUpperCase()); types.add(desc.fieldTypeName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldMetadata.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldMetadata.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldMetadata.java new file mode 100644 index 0000000..9799cda --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldMetadata.java @@ -0,0 +1,46 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query; + +import java.io.*; + +/** + * Query field descriptor. This descriptor is used to provide metadata + * about fields returned in query result. + */ +public interface GridQueryFieldMetadata extends Externalizable { + /** + * Gets schema name. + * + * @return Schema name. + */ + public String schemaName(); + + /** + * Gets name of type to which this field belongs. + * + * @return Gets type name. + */ + public String typeName(); + + /** + * Gets field name. + * + * @return Field name. + */ + public String fieldName(); + + /** + * Gets field type name. + * + * @return Field type name. + */ + public String fieldTypeName(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldsResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldsResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldsResult.java new file mode 100644 index 0000000..ae2ab8a --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldsResult.java @@ -0,0 +1,34 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query; + +import org.apache.ignite.spi.*; + +import java.util.*; + +/** + * Field query result. It is composed of + * fields metadata and iterator over queried fields. + */ +public interface GridQueryFieldsResult { + /** + * Gets metadata for queried fields. + * + * @return Meta data for queried fields. + */ + List<GridQueryFieldMetadata> metaData(); + + /** + * Gets iterator over queried fields. + * + * @return Iterator over queried fields. + */ + IgniteSpiCloseableIterator<List<?>> iterator(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldsResultAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldsResultAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldsResultAdapter.java new file mode 100644 index 0000000..cf5e855 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryFieldsResultAdapter.java @@ -0,0 +1,49 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query; + +import org.gridgain.grid.util.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Convenience adapter for {@link GridQueryFieldsResult}. + */ +public class GridQueryFieldsResultAdapter implements GridQueryFieldsResult { + /** Meta data. */ + private final List<GridQueryFieldMetadata> metaData; + + /** Result iterator. */ + private final GridCloseableIterator<List<?>> it; + + /** + * Creates query field result composed of field metadata and iterator + * over queried fields. + * + * @param metaData Meta data. + * @param it Result iterator. + */ + public GridQueryFieldsResultAdapter(@Nullable List<GridQueryFieldMetadata> metaData, + GridCloseableIterator<List<?>> it) { + this.metaData = metaData != null ? Collections.unmodifiableList(metaData) : null; + this.it = it; + } + + /** {@inheritDoc} */ + @Override public List<GridQueryFieldMetadata> metaData() { + return metaData; + } + + /** {@inheritDoc} */ + @Override public GridCloseableIterator<List<?>> iterator() { + return it; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexDescriptor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexDescriptor.java new file mode 100644 index 0000000..a971123 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexDescriptor.java @@ -0,0 +1,42 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query; + +import java.util.*; + +/** + * Describes an index to be created for a certain type. It contains all necessary + * information about fields, order, uniqueness, and specified + * whether this is SQL or Text index. + * See also {@link GridQueryTypeDescriptor#indexes()}. + */ +public interface GridQueryIndexDescriptor { + /** + * Gets all fields to be indexed. + * + * @return Fields to be indexed. + */ + public Collection<String> fields(); + + /** + * Specifies order of the index for each indexed field. + * + * @param field Field name. + * @return {@code True} if given field should be indexed in descending order. + */ + public boolean descending(String field); + + /** + * Gets index type. + * + * @return Type. + */ + public GridQueryIndexType type(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexType.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexType.java new file mode 100644 index 0000000..313b117 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexType.java @@ -0,0 +1,24 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query; + +/** + * Index types. + */ +public enum GridQueryIndexType { + /** Sorted SQL index. */ + SORTED, + + /** Spatial SQL index. */ + GEO_SPATIAL, + + /** Fulltext index. */ + FULLTEXT +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java new file mode 100644 index 0000000..e22bfd4 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java @@ -0,0 +1,162 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query; + +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.indexing.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.util.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Abstraction for internal indexing implementation. + */ +public interface GridQueryIndexing { + /** + * Starts indexing. + * + * @param ctx Context. + * @throws GridException If failed. + */ + public void start(GridKernalContext ctx) throws GridException; + + /** + * Stops indexing. + * + * @throws GridException If failed. + */ + public void stop() throws GridException; + + /** + * Queries individual fields (generally used by JDBC drivers). + * + * @param spaceName Space name. + * @param qry Query. + * @param params Query parameters. + * @param filters Space name and key filters. + * @return Query result. + * @throws GridException If failed. + */ + public <K, V> GridQueryFieldsResult queryFields(@Nullable String spaceName, String qry, + Collection<Object> params, GridIndexingQueryFilter filters) throws GridException; + + /** + * Executes regular query. + * + * @param spaceName Space name. + * @param qry Query. + * @param params Query parameters. + * @param type Query return type. + * @param filters Space name and key filters. + * @return Queried rows. + * @throws GridException If failed. + */ + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(@Nullable String spaceName, String qry, + Collection<Object> params, GridQueryTypeDescriptor type, GridIndexingQueryFilter filters) throws GridException; + + /** + * Executes text query. + * + * @param spaceName Space name. + * @param qry Text query. + * @param type Query return type. + * @param filters Space name and key filter. + * @return Queried rows. + * @throws GridException If failed. + */ + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryText(@Nullable String spaceName, String qry, + GridQueryTypeDescriptor type, GridIndexingQueryFilter filters) throws GridException; + + /** + * Gets size of index for given type or -1 if it is a unknown type. + * + * @param spaceName Space name. + * @param desc Type descriptor. + * @param filters Filters. + * @return Objects number. + * @throws GridException If failed. + */ + public long size(@Nullable String spaceName, GridQueryTypeDescriptor desc, GridIndexingQueryFilter filters) + throws GridException; + + /** + * Registers type if it was not known before or updates it otherwise. + * + * @param spaceName Space name. + * @param desc Type descriptor. + * @throws GridException If failed. + * @return {@code True} if type was registered, {@code false} if for some reason it was rejected. + */ + public boolean registerType(@Nullable String spaceName, GridQueryTypeDescriptor desc) throws GridException; + + /** + * Unregisters type and removes all corresponding data. + * + * @param spaceName Space name. + * @param type Type descriptor. + * @throws GridException If failed. + */ + public void unregisterType(@Nullable String spaceName, GridQueryTypeDescriptor type) throws GridException; + + /** + * Updates index. Note that key is unique for space, so if space contains multiple indexes + * the key should be removed from indexes other than one being updated. + * + * @param spaceName Space name. + * @param type Value type. + * @param key Key. + * @param val Value. + * @param ver Version. + * @param expirationTime Expiration time or 0 if never expires. + * @throws GridException If failed. + */ + public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Object key, Object val, byte[] ver, + long expirationTime) throws GridException; + + /** + * Removes index entry by key. + * + * @param spaceName Space name. + * @param key Key. + * @throws GridException If failed. + */ + public void remove(@Nullable String spaceName, Object key) throws GridException; + + /** + * Will be called when entry with given key is swapped. + * + * @param spaceName Space name. + * @param key Key. + * @throws GridException If failed. + */ + public void onSwap(@Nullable String spaceName, Object key) throws GridException; + + /** + * Will be called when entry with given key is unswapped. + * + * @param spaceName Space name. + * @param key Key. + * @param val Value. + * @param valBytes Value bytes. + * @throws GridException If failed. + */ + public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) throws GridException; + + /** + * Rebuilds all indexes of given type. + * + * @param spaceName Space name. + * @param type Type descriptor. + */ + public void rebuildIndexes(@Nullable String spaceName, GridQueryTypeDescriptor type); +}