ignite-sql-old - new api
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3b8f9a64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3b8f9a64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3b8f9a64 Branch: refs/heads/sprint-1 Commit: 3b8f9a64db71245b307f0f028149a0ded85574c7 Parents: cb65ec5 Author: S.Vladykin <svlady...@gridgain.com> Authored: Sat Feb 14 01:55:01 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Sat Feb 14 01:55:01 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 37 ++++ .../ignite/cache/query/ContinuousQuery.java | 208 ++++++++++++++++++ .../org/apache/ignite/cache/query/Query.java | 134 ++++++++++++ .../apache/ignite/cache/query/QueryCursor.java | 45 ++++ .../apache/ignite/cache/query/ScanQuery.java | 76 +++++++ .../org/apache/ignite/cache/query/SpiQuery.java | 64 ++++++ .../ignite/cache/query/SqlFieldsQuery.java | 98 +++++++++ .../org/apache/ignite/cache/query/SqlQuery.java | 142 +++++++++++++ .../apache/ignite/cache/query/TextQuery.java | 119 +++++++++++ .../processors/cache/IgniteCacheProxy.java | 212 +++++++++++++++++++ .../processors/cache/query/QueryCursorImpl.java | 87 ++++++++ .../processors/query/GridQueryProcessor.java | 2 +- 12 files changed, 1223 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index fabe3b5..c33d1a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -18,6 +18,7 @@ package org.apache.ignite; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.cache.*; @@ -190,9 +191,45 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS */ public boolean isLocalLocked(K key, boolean byCurrThread); + /** + * Queries cache. Accepts any subclass of {@link Query}. + * + * @param qry Query. + * @return Cursor. + * @see ScanQuery + * @see SqlQuery + * @see TextQuery + * @see SpiQuery + */ + public QueryCursor<Entry<K, V>> query(Query qry); + /** + * Queries separate entry fields. + * + * @param qry SQL Query. + * @return Cursor. + */ + public QueryCursor<List<?>> queryFields(SqlFieldsQuery qry); + /** + * Queries cache locally. Accepts any subclass of {@link Query}. + * + * @param qry Query. + * @return Cursor. + * @see ScanQuery + * @see SqlQuery + * @see TextQuery + * @see SpiQuery + */ + public QueryCursor<Entry<K, V>> localQuery(Query qry); + /** + * Queries separate entry fields locally. + * + * @param qry SQL Query. + * @return Cursor. + */ + public QueryCursor<List<?>> localQueryFields(SqlFieldsQuery qry); public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java new file mode 100644 index 0000000..b02c65f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.*; + +import javax.cache.event.*; + +/** + * API for configuring and executing continuous cache queries. + * <p> + * Continuous queries are executed as follows: + * <ol> + * <li> + * Query is sent to requested grid nodes. Note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} + * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches query will be always executed + * locally. + * </li> + * <li> + * Each node iterates through existing cache data and registers listeners that will + * notify about further updates. + * <li> + * Each key-value pair is passed through optional filter and if this filter returns + * true, key-value pair is sent to the master node (the one that executed query). + * If filter is not provided, all pairs are sent. + * </li> + * <li> + * When master node receives key-value pairs, it notifies the local callback. + * </li> + * </ol> + * <h2 class="header">NOTE</h2> + * Under some concurrent circumstances callback may get several notifications + * for one cache update. This should be taken into account when implementing callback. + * <h1 class="header">Query usage</h1> + * As an example, suppose we have cache with {@code 'Person'} objects and we need + * to query all persons with salary above 1000. + * <p> + * Here is the {@code Person} class: + * <pre name="code" class="java"> + * public class Person { + * // Name. + * private String name; + * + * // Salary. + * private double salary; + * + * ... + * } + * </pre> + * <p> + * You can create and execute continuous query like so: + * <pre name="code" class="java"> + * // Create new continuous query. + * qry = cache.createContinuousQuery(); + * + * // Callback that is called locally when update notifications are received. + * // It simply prints out information about all created persons. + * qry.callback(new GridPredicate2<UUID, Collection<Map.Entry<UUID, Person>>>() { + * @Override public boolean apply(UUID uuid, Collection<Map.Entry<UUID, Person>> entries) { + * for (Map.Entry<UUID, Person> e : entries) { + * Person p = e.getValue(); + * + * X.println(">>>"); + * X.println(">>> " + p.getFirstName() + " " + p.getLastName() + + * "'s salary is " + p.getSalary()); + * X.println(">>>"); + * } + * + * return true; + * } + * }); + * + * // This query will return persons with salary above 1000. + * qry.filter(new GridPredicate2<UUID, Person>() { + * @Override public boolean apply(UUID uuid, Person person) { + * return person.getSalary() > 1000; + * } + * }); + * + * // Execute query. + * qry.execute(); + * </pre> + * This will execute query on all nodes that have cache you are working with and notify callback + * with both data that already exists in cache and further updates. + * <p> + * To stop receiving updates call {@link #close()} method: + * <pre name="code" class="java"> + * qry.cancel(); + * </pre> + * Note that one query instance can be executed only once. After it's cancelled, it's non-operational. + * If you need to repeat execution, use {@link org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create + * new query. + */ +public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> implements AutoCloseable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Default buffer size. Size of {@code 1} means that all entries + * will be sent to master node immediately (buffering is disabled). + */ + public static final int DFLT_BUF_SIZE = 1; + + /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */ + public static final long DFLT_TIME_INTERVAL = 0; + + /** + * Default value for automatic unsubscription flag. Remote filters + * will be unregistered by default if master node leaves topology. + */ + public static final boolean DFLT_AUTO_UNSUBSCRIBE = true; + + public void setInitialPredicate(Query filter) { + // TODO: implement. + } + + /** + * Sets local callback. This callback is called only in local node when new updates are received. + * <p> The callback predicate accepts ID of the node from where updates are received and collection + * of received entries. Note that for removed entries value will be {@code null}. + * <p> + * If the predicate returns {@code false}, query execution will be cancelled. + * <p> + * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g., + * synchronization or transactional cache operations), should be executed asynchronously without + * blocking the thread that called the callback. Otherwise, you can get deadlocks. + * + * @param locLsnr Local callback. + */ + public void setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) { + // TODO: implement. + } + + /** + * Sets optional key-value filter. This filter is called before entry is sent to the master node. + * <p> + * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking + * (e.g., synchronization or transactional cache operations), should be executed asynchronously + * without blocking the thread that called the filter. Otherwise, you can get deadlocks. + * + * @param filter Key-value filter. + */ + public void setRemoteFilter(CacheEntryEventFilter<K, V> filter) { + // TODO: implement. + } + + /** + * Sets buffer size. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will be + * sent to the master node only if the buffer is full or time provided via {@link #timeInterval(long)} method is + * exceeded. <p> Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is + * disabled). + * + * @param bufSize Buffer size. + */ + public void bufferSize(int bufSize) { + // TODO: implement. + } + + /** + * Sets time interval. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will + * be sent to the master node only if the buffer is full (its size can be provided via {@link #bufferSize(int)} + * method) or time provided via this method is exceeded. <p> Default time interval is {@code 0} which means that + * time check is disabled and entries will be sent only when buffer is full. + * + * @param timeInterval Time interval. + */ + public void timeInterval(long timeInterval) { + // TODO: implement. + } + + /** + * Sets automatic unsubscribe flag. <p> This flag indicates that query filters on remote nodes should be + * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is {@code + * false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be + * unregistered if master node leaves grid. <p> Default value for this flag is {@code true}. + * + * @param autoUnsubscribe Automatic unsubscription flag. + */ + public void autoUnsubscribe(boolean autoUnsubscribe) { + // TODO: implement. + } + + /** + * Stops continuous query execution. <p> Note that one query instance can be executed only once. After it's + * cancelled, it's non-operational. If you need to repeat execution, use {@link + * org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create new query. + * + * @throws IgniteCheckedException In case of error. + */ + @Override public void close() throws IgniteCheckedException { + // TODO: implement. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java new file mode 100644 index 0000000..744d8d2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.indexing.*; + +import java.io.*; + +/** + * Base class for all Ignite cache queries. + * Use {@link SqlQuery} and {@link TextQuery} for SQL and + * text queries accordingly. + * <p> + * Also contains convenience shortcuts for query object construction: + * {@link #sql(Class, String)}, {@link #sql(String)}, {@link #text(Class, String)}, + * {@link #scan(IgniteBiPredicate)} and {@link #spi()}. + * + * @see IgniteCache#query(Query) + * @see IgniteCache#localQuery(Query) + * @see IgniteCache#queryFields(SqlFieldsQuery) + * @see IgniteCache#localQueryFields(SqlFieldsQuery) + */ +public abstract class Query<T extends Query> implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Page size. */ + private int pageSize; + + /** + * Empty constructor. + */ + Query() { + // No-op. + } + + /** + * Factory method for SQL fields queries. + * + * @param sql SQL Query string. + * @return SQL Fields query instance. + */ + public static SqlFieldsQuery sql(String sql) { + return new SqlFieldsQuery(sql); + } + + /** + * Factory method for SQL queries. + * + * @param type Type to be queried. + * @param sql SQL Query string. + * @return SQL Query instance. + */ + public static SqlQuery sql(Class<?> type, String sql) { + return new SqlQuery(type, sql); + } + + /** + * Factory method for Lucene fulltext queries. + * + * @param type Type to be queried. + * @param txt Search string. + * @return Fulltext query. + */ + public static TextQuery text(Class<?> type, String txt) { + return new TextQuery(txt).setType(type); + } + + /** + * Factory method for SPI queries. + * + * @param filter Filter. + * @return SPI Query. + */ + public static <K, V> ScanQuery<K, V> scan(final IgniteBiPredicate<K, V> filter) { + return new ScanQuery<>(filter); + } + + /** + * Factory method for SPI queries. + * + * @return SPI Query. + * @see IndexingSpi + */ + public static SpiQuery spi() { + return new SpiQuery(); + } + + /** + * Gets optional page size, if {@code 0}, then {@link CacheQueryConfiguration#getPageSize()} is used. + * + * @return Optional page size. + */ + public int getPageSize() { + return pageSize; + } + + /** + * Sets optional page size, if {@code 0}, then {@link CacheQueryConfiguration#getPageSize()} is used. + * + * @param pageSize Optional page size. + * @return {@code this} For chaining. + */ + @SuppressWarnings("unchecked") + public T setPageSize(int pageSize) { + this.pageSize = pageSize; + + return (T)this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Query.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCursor.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCursor.java new file mode 100644 index 0000000..b0c896a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCursor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.*; + +/** + * Query result cursor. Implements {@link Iterable} only for convenience, e.g. {@link #iterator()} + * can be obtained only once. Also if iteration is started then {@link #getAll()} method calls are prohibited. + * <p> + * Not thread safe and must be used from single thread only. + */ +public interface QueryCursor<T> extends Iterable<T>, AutoCloseable { + /** + * Gets all query results and stores them in the collection. + * Use this method when you know in advance that query result is + * relatively small and will not cause memory utilization issues. + * <p> + * Since all the results will be fetched, all the resources will be closed + * automatically after this call, e.g. there is no need to call {@link #close()} method in this case. + * + * @return List containing all query results. + */ + public List<T> getAll(); + + /** + * Closes all resources related to this cursor. + */ + @Override public void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java new file mode 100644 index 0000000..af9b0be --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +/** + * Scan query over cache entries. Will accept all the entries if no predicate was set. + * + * @see IgniteCache#query(Query) + * @see IgniteCache#localQuery(Query) + */ +public class ScanQuery<K, V> extends Query<ScanQuery<K, V>> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteBiPredicate<K,V> filter; + + /** + * Create scan query returning all entries. + */ + public ScanQuery() { + this(null); + } + + /** + * Create scan query with filter. + * + * @param filter Filter. If {@code null} then all entries will be returned. + */ + public ScanQuery(@Nullable IgniteBiPredicate<K,V> filter) { + setFilter(filter); + } + + /** + * Gets filter. + * + * @return Filter. + */ + public IgniteBiPredicate<K,V> getFilter() { + return filter; + } + + /** + * Sets filter. + * + * @param filter Filter. If {@code null} then all entries will be returned. + */ + public void setFilter(@Nullable IgniteBiPredicate<K,V> filter) { + this.filter = filter; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ScanQuery.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/cache/query/SpiQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SpiQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SpiQuery.java new file mode 100644 index 0000000..34d609a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SpiQuery.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.indexing.*; + +/** + * Query to be used by {@link IndexingSpi} implementations. + * + * @see IgniteCache#query(Query) + * @see IgniteCache#localQuery(Query) + */ +public final class SpiQuery extends Query<SpiQuery> { + /** */ + private static final long serialVersionUID = 0L; + + /** Arguments. */ + @GridToStringInclude + private Object[] args; + + /** + * Gets SQL arguments. + * + * @return SQL arguments. + */ + public Object[] getArgs() { + return args; + } + + /** + * Sets SQL arguments. + * + * @param args SQL arguments. + * @return {@code this} For chaining. + */ + public SpiQuery setArgs(Object... args) { + this.args = args; + + return this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SpiQuery.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java new file mode 100644 index 0000000..c0733e6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * SQL Fields query. + * + * @see IgniteCache#queryFields(SqlFieldsQuery) + * @see IgniteCache#localQueryFields(SqlFieldsQuery) + */ +public final class SqlFieldsQuery extends Query<SqlFieldsQuery>{ + /** */ + private static final long serialVersionUID = 0L; + + /** SQL Query. */ + private String sql; + + /** Arguments. */ + @GridToStringInclude + private Object[] args; + + /** + * Constructs sql fields query. + * + * @param sql SQL Query. + */ + public SqlFieldsQuery(String sql) { + setSql(sql); + } + + /** + * Gets SQL clause. + * + * @return SQL clause. + */ + public String getSql() { + return sql; + } + + /** + * Sets SQL clause. + * + * @param sql SQL clause. + * @return {@code this} For chaining. + */ + public SqlFieldsQuery setSql(String sql) { + A.notNull(sql, "sql"); + + this.sql = sql; + + return this; + } + + /** + * Gets SQL arguments. + * + * @return SQL arguments. + */ + public Object[] getArgs() { + return args; + } + + /** + * Sets SQL arguments. + * + * @param args SQL arguments. + * @return {@code this} For chaining. + */ + public SqlFieldsQuery setArgs(Object... args) { + this.args = args; + + return this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SqlFieldsQuery.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java new file mode 100644 index 0000000..3cb767d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * SQL Query. + * + * @see IgniteCache#query(Query) + * @see IgniteCache#localQuery(Query) + */ +public final class SqlQuery extends Query<SqlQuery> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String type; + + /** SQL clause. */ + private String sql; + + /** Arguments. */ + @GridToStringInclude + private Object[] args; + + /** + * Constructs query for the given SQL query. + * + * @param sql SQL Query. + */ + public SqlQuery(String sql) { + setSql(sql); + } + + /** + * Constructs query for the given type and SQL query. + * + * @param type Type. + * @param sql SQL Query. + */ + public SqlQuery(Class<?> type, String sql) { + this(sql); + + setType(type); + } + + /** + * Gets SQL clause. + * + * @return SQL clause. + */ + public String getSql() { + return sql; + } + + /** + * Sets SQL clause. + * + * @param sql SQL clause. + * @return {@code this} For chaining. + */ + public SqlQuery setSql(String sql) { + A.notNull(sql, "sql"); + + this.sql = sql; + + return this; + } + + /** + * Gets SQL arguments. + * + * @return SQL arguments. + */ + public Object[] getArgs() { + return args; + } + + /** + * Sets SQL arguments. + * + * @param args SQL arguments. + * @return {@code this} For chaining. + */ + public SqlQuery setArgs(Object... args) { + this.args = args; + + return this; + } + + /** + * Gets type for query. + * + * @return Type. + */ + public String getType() { + return type; + } + + /** + * Sets type for query. + * + * @param type Type. + * @return {@code this} For chaining. + */ + public SqlQuery setType(String type) { + this.type = type; + + return this; + } + + /** + * @param type Type. + */ + public SqlQuery setType(Class<?> type) { + return setType(GridQueryProcessor.typeName(type)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SqlQuery.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java new file mode 100644 index 0000000..3de90a0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Query for Lucene based fulltext search. + * + * @see IgniteCache#query(Query) + * @see IgniteCache#localQuery(Query) + */ +public final class TextQuery extends Query<TextQuery> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String type; + + /** SQL clause. */ + private String txt; + + /** + * Constructs query for the given search string. + * + * @param txt Search string. + */ + public TextQuery(String txt) { + setText(txt); + } + + /** + * Constructs query for the given search string. + * + * @param type Type. + * @param txt Search string. + */ + public TextQuery(Class<?> type, String txt) { + this(txt); + + setType(type); + } + + /** + * Gets type for query. + * + * @return Type. + */ + public String getType() { + return type; + } + + /** + * Sets type for query. + * + * @param type Type. + * @return {@code this} For chaining. + */ + public TextQuery setType(Class<?> type) { + return setType(GridQueryProcessor.typeName(type)); + } + + /** + * Sets type for query. + * + * @param type Type. + * @return {@code this} For chaining. + */ + public TextQuery setType(String type) { + this.type = type; + + return this; + } + + /** + * Gets text search string. + * + * @return Text search string. + */ + public String getText() { + return txt; + } + + /** + * Sets text search string. + * + * @param txt Text search string. + * @return {@code this} For chaining. + */ + public TextQuery setText(String txt) { + A.notNull(txt, "txt"); + + this.txt = txt; + + return this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TextQuery.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 9238d3c..433837d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -19,8 +19,12 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -246,6 +250,214 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } + private IgniteBiPredicate<K,V> accepAll() { + return new IgniteBiPredicate<K,V>() { + @Override public boolean apply(K k, V v) { + return true; + } + }; + } + + /** + * @param filter Filter. + * @param grp Optional cluster group. + * @return Cursor. + */ + @SuppressWarnings("unchecked") + private QueryCursor<Entry<K,V>> query(Query filter, @Nullable ClusterGroup grp) { + final CacheQuery<Map.Entry<K,V>> qry; + final CacheQueryFuture<Map.Entry<K,V>> fut; + + if (filter instanceof ScanQuery) { + IgniteBiPredicate<K,V> p = ((ScanQuery)filter).getFilter(); + + qry = delegate.queries().createScanQuery(p != null ? p : accepAll()); + + if (grp != null) + qry.projection(grp); + + fut = qry.execute(); + } + else if (filter instanceof TextQuery) { + TextQuery p = (TextQuery)filter; + + qry = delegate.queries().createFullTextQuery(p.getType(), p.getText()); + + if (grp != null) + qry.projection(grp); + + fut = qry.execute(); + } + else if (filter instanceof SpiQuery) { + qry = ((GridCacheQueriesEx)delegate.queries()).createSpiQuery(); + + if (grp != null) + qry.projection(grp); + + fut = qry.execute(((SpiQuery)filter).getArgs()); + } + else + throw new IgniteException("Unsupported query predicate: " + filter); + + return new org.apache.ignite.internal.processors.cache.QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,V>>() { + /** */ + Map.Entry<K,V> cur; + + @Override protected Entry<K,V> onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); + + Map.Entry<K,V> e = cur; + + cur = null; + + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return cur != null || (cur = fut.next()) != null; + } + + @Override protected void onClose() throws IgniteCheckedException { + fut.cancel(); + } + }); + } + + /** + * @param local Enforce local. + * @return Local node cluster group. + */ + private ClusterGroup projection(boolean local) { + return local || ctx.isLocal() || ctx.isReplicated() ? ctx.kernalContext().grid().forLocal() : null; + } + + /** {@inheritDoc} */ + @Override public QueryCursor<Entry<K,V>> query(Query qry) { + A.notNull(qry, "qry"); + + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + validate(qry); + + if (qry instanceof SqlQuery) { + return null; // TODO + } + + return query(qry, projection(false)); + } + catch (Exception e) { + if (e instanceof CacheException) + throw e; + + throw new CacheException(e); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public QueryCursor<List<?>> queryFields(SqlFieldsQuery qry) { + A.notNull(qry, "qry"); + + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + validate(qry); + + return null; // TODO + } + catch (Exception e) { + if (e instanceof CacheException) + throw e; + + throw new CacheException(e); + } + finally { + gate.leave(prev); + } + } + + /** + * @param p Query. + * @return Cursor. + */ + private QueryCursor<Entry<K,V>> doLocalQuery(SqlQuery p) { + return null; // TODO +// new org.apache.ignite.internal.processors.cache.QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal( +// ctx.name(), p.getType(), p.getSql(), p.getArgs())); + } + + /** + * @param q Query. + * @return Cursor. + */ + private QueryCursor<List<?>> doLocalFieldsQuery(SqlFieldsQuery q) { + return null; // TODO +// new org.apache.ignite.internal.processors.cache.QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields( +// ctx.name(), q.getSql(), q.getArgs())); + } + + /** + * Checks query. + * + * @param qry Query + * @throws CacheException If query indexing disabled for sql query. + */ + private void validate(Query qry) { + if (!(qry instanceof ScanQuery) && !ctx.config().isQueryIndexEnabled()) + throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name()); + } + + /** {@inheritDoc} */ + @Override public QueryCursor<Entry<K,V>> localQuery(Query qry) { + A.notNull(qry, "qry"); + + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + validate(qry); + + if (qry instanceof SqlQuery) + return doLocalQuery((SqlQuery)qry); + + return query(qry, projection(true)); + } + catch (Exception e) { + if (e instanceof CacheException) + throw e; + + throw new CacheException(e); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public QueryCursor<List<?>> localQueryFields(SqlFieldsQuery qry) { + A.notNull(qry, "qry"); + + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + validate(qry); + + return doLocalFieldsQuery(qry); + } + catch (Exception e) { + if (e instanceof CacheException) + throw e; + + throw new CacheException(e); + } + finally { + gate.leave(prev); + } + } + /** {@inheritDoc} */ @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorImpl.java new file mode 100644 index 0000000..cc1af78 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorImpl.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; + +import java.util.*; + +/** + * Query cursor implementation. + */ +public class QueryCursorImpl<T> implements QueryCursor<T> { + /** */ + private Iterator<T> iter; + + /** */ + private boolean iterTaken; + + /** + * @param iter Iterator. + */ + public QueryCursorImpl(Iterator<T> iter) { + this.iter = iter; + } + + /** {@inheritDoc} */ + @Override public Iterator<T> iterator() { + if (iter == null) + throw new IgniteException("Cursor is closed."); + + if (iterTaken) + throw new IgniteException("Iterator is already taken from this cursor."); + + iterTaken = true; + + return iter; + } + + /** {@inheritDoc} */ + @Override public List<T> getAll() { + ArrayList<T> all = new ArrayList<>(); + + try { + for (T t : this) // Implicitly calls iterator() to do all checks. + all.add(t); + } + finally { + close(); + } + + return all; + } + + /** {@inheritDoc} */ + @Override public void close() { + Iterator<T> i; + + if ((i = iter) != null) { + iter = null; + + if (i instanceof AutoCloseable) { + try { + ((AutoCloseable)i).close(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b8f9a64/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 0b2c2f1..7972497 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -500,7 +500,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param cls Class. * @return Type name. */ - public String typeName(Class<?> cls) { + public static String typeName(Class<?> cls) { String typeName = cls.getSimpleName(); // To protect from failure on anonymous classes.