http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStoreAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStoreAdapter.java b/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStoreAdapter.java deleted file mode 100644 index c68eddb..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStoreAdapter.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.cache.store; - -import org.apache.ignite.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Cache storage convenience adapter. It provides default implementation for bulk operations, such - * as {@link #loadAll(IgniteTx, Collection, org.apache.ignite.lang.IgniteBiInClosure)}, - * {@link #putAll(IgniteTx, Map)}, and {@link #removeAll(IgniteTx, Collection)} - * by sequentially calling corresponding {@link #load(IgniteTx, Object)}, - * {@link #put(IgniteTx, Object, Object)}, and {@link #remove(IgniteTx, Object)} - * operations. Use this adapter whenever such behaviour is acceptable. However in many cases - * it maybe more preferable to take advantage of database batch update functionality, and therefore - * default adapter implementation may not be the best option. - * <p> - * Note that method {@link #loadCache(org.apache.ignite.lang.IgniteBiInClosure, Object...)} has empty - * implementation because it is essentially up to the user to invoke it with - * specific arguments. - */ -public abstract class GridCacheStoreAdapter<K, V> implements GridCacheStore<K, V> { - /** */ - @IgniteCacheSessionResource - private CacheStoreSession ses; - - /** - * @return Current session. - */ - protected CacheStoreSession session() { - return ses; - } - - /** - * Default empty implementation. This method needs to be overridden only if - * {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method - * is explicitly called. - * - * @param clo {@inheritDoc} - * @param args {@inheritDoc} - * @throws IgniteCheckedException {@inheritDoc} - */ - @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) - throws IgniteCheckedException { - /* No-op. */ - } - - /** {@inheritDoc} */ - @Override public void loadAll(@Nullable IgniteTx tx, Collection<? extends K> keys, - IgniteBiInClosure<K, V> c) throws IgniteCheckedException { - assert keys != null; - - for (K key : keys) { - V v = load(tx, key); - - if (v != null) - c.apply(key, v); - } - } - - /** {@inheritDoc} */ - @Override public void putAll(IgniteTx tx, Map<? extends K, ? extends V> map) - throws IgniteCheckedException { - assert map != null; - - for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) - put(tx, e.getKey(), e.getValue()); - } - - /** {@inheritDoc} */ - @Override public void removeAll(IgniteTx tx, Collection<? extends K> keys) - throws IgniteCheckedException { - assert keys != null; - - for (K key : keys) - remove(tx, key); - } - - /** - * Default empty implementation for ending transactions. Note that if explicit cache - * transactions are not used, then transactions do not have to be explicitly ended - - * for all other cases this method should be overridden with custom commit/rollback logic. - * - * @param tx {@inheritDoc} - * @param commit {@inheritDoc} - * @throws IgniteCheckedException {@inheritDoc} - */ - @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { - // No-op. - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStoreBalancingWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStoreBalancingWrapper.java b/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStoreBalancingWrapper.java deleted file mode 100644 index 65d39b4..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStoreBalancingWrapper.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.cache.store; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.util.future.*; -import org.gridgain.grid.util.typedef.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Cache store wrapper that ensures that there will be no more that one thread loading value from underlying store. - */ -public class GridCacheStoreBalancingWrapper<K, V> implements GridCacheStore<K, V> { - /** */ - public static final int DFLT_LOAD_ALL_THRESHOLD = 5; - - /** Delegate store. */ - private GridCacheStore<K, V> delegate; - - /** Pending cache store loads. */ - private ConcurrentMap<K, LoadFuture> pendingLoads = new ConcurrentHashMap8<>(); - - /** Load all threshold. */ - private int loadAllThreshold = DFLT_LOAD_ALL_THRESHOLD; - - /** - * @param delegate Delegate store. - */ - public GridCacheStoreBalancingWrapper(GridCacheStore<K, V> delegate) { - this.delegate = delegate; - } - - /** - * @param delegate Delegate store. - * @param loadAllThreshold Load all threshold. - */ - public GridCacheStoreBalancingWrapper(GridCacheStore<K, V> delegate, int loadAllThreshold) { - this.delegate = delegate; - this.loadAllThreshold = loadAllThreshold; - } - - /** {@inheritDoc} */ - @Nullable @Override public V load(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { - LoadFuture fut = pendingLoads.get(key); - - if (fut != null) - return fut.get(key); - - fut = new LoadFuture(); - - LoadFuture old = pendingLoads.putIfAbsent(key, fut); - - if (old != null) - return old.get(key); - - try { - V val = delegate.load(tx, key); - - fut.onComplete(key, val); - - return val; - } - catch (Throwable e) { - fut.onError(key, e); - - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws IgniteCheckedException { - delegate.loadCache(clo, args); - } - - /** {@inheritDoc} */ - @Override public void loadAll(@Nullable IgniteTx tx, Collection<? extends K> keys, final IgniteBiInClosure<K, V> c) - throws IgniteCheckedException { - if (keys.size() > loadAllThreshold) { - delegate.loadAll(tx, keys, c); - - return; - } - - Collection<K> needLoad = null; - Map<K, LoadFuture> pending = null; - LoadFuture span = null; - - for (K key : keys) { - LoadFuture fut = pendingLoads.get(key); - - if (fut != null) { - if (pending == null) - pending = new HashMap<>(); - - pending.put(key, fut); - } - else { - // Try to concurrently add pending future. - if (span == null) - span = new LoadFuture(); - - LoadFuture old = pendingLoads.putIfAbsent(key, span); - - if (old != null) { - if (pending == null) - pending = new HashMap<>(); - - pending.put(key, old); - } - else { - if (needLoad == null) - needLoad = new ArrayList<>(keys.size()); - - needLoad.add(key); - } - } - } - - if (needLoad != null) { - assert !needLoad.isEmpty(); - assert span != null; - - final ConcurrentMap<K, V> loaded = new ConcurrentHashMap8<>(); - - try { - delegate.loadAll(tx, needLoad, new CI2<K, V>() { - @Override public void apply(K k, V v) { - if (v != null) { - loaded.put(k, v); - - c.apply(k, v); - } - } - }); - - span.onComplete(needLoad, loaded); - } - catch (Throwable e) { - span.onError(needLoad, e); - - throw e; - } - } - - if (pending != null) { - for (Map.Entry<K, LoadFuture> e : pending.entrySet()) { - K key = e.getKey(); - - c.apply(key, e.getValue().get(key)); - } - } - } - - /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, K key, V val) throws IgniteCheckedException { - delegate.put(tx, key, val); - } - - /** {@inheritDoc} */ - @Override public void putAll(@Nullable IgniteTx tx, Map<? extends K, ? extends V> map) throws IgniteCheckedException { - delegate.putAll(tx, map); - } - - /** {@inheritDoc} */ - @Override public void remove(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { - delegate.remove(tx, key); - } - - /** {@inheritDoc} */ - @Override public void removeAll(@Nullable IgniteTx tx, Collection<? extends K> keys) throws IgniteCheckedException { - delegate.removeAll(tx, keys); - } - - /** {@inheritDoc} */ - @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { - delegate.txEnd(tx, commit); - } - - /** - * - */ - private class LoadFuture extends GridFutureAdapter<Map<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Collection of keys for pending cleanup. */ - private volatile Collection<K> keys; - - /** - * - */ - public LoadFuture() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Map<K, V> res, @Nullable Throwable err) { - if (super.onDone(res, err)) { - assert keys != null; - - for (K key : keys) - pendingLoads.remove(key, this); - - return true; - } - - return false; - } - - /** - * @param key Key. - * @param val Loaded value. - */ - public void onComplete(K key, V val) { - onComplete(Collections.singletonList(key), F.asMap(key, val)); - } - - /** - * @param keys Keys. - * @param res Loaded values. - */ - public void onComplete(Collection<K> keys, Map<K, V> res) { - this.keys = keys; - - onDone(res); - } - - /** - * @param key Key. - * @param err Error. - */ - public void onError(K key, Throwable err) { - - } - - /** - * @param keys Keys. - * @param err Error. - */ - public void onError(Collection<K> keys, Throwable err) { - this.keys = keys; - - onDone(err); - } - - /** - * Gets value loaded for key k. - * - * @param key Key to load. - * @return Loaded value (possibly {@code null}). - * @throws IgniteCheckedException If load failed. - */ - public V get(K key) throws IgniteCheckedException { - return get().get(key); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/cache/store/jdbc/GridCacheJdbcBlobStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/store/jdbc/GridCacheJdbcBlobStore.java b/modules/core/src/main/java/org/gridgain/grid/cache/store/jdbc/GridCacheJdbcBlobStore.java deleted file mode 100644 index e7f912e..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/store/jdbc/GridCacheJdbcBlobStore.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.cache.store.jdbc; - -import org.apache.ignite.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.store.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import javax.sql.*; -import java.sql.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * {@link GridCacheStore} implementation backed by JDBC. This implementation - * stores objects in underlying database in {@code BLOB} format. - * <p> - * Store will create table {@code ENTRIES} in the database to store data. - * Table will have {@code key} and {@code val} fields. - * <p> - * If custom DDL and DML statements are provided, table and field names have - * to be consistent for all statements and sequence of parameters have to be - * preserved. - * <h2 class="header">Configuration</h2> - * Sections below describe mandatory and optional configuration settings as well - * as providing example using Java and Spring XML. - * <h3>Mandatory</h3> - * There are no mandatory configuration parameters. - * <h3>Optional</h3> - * <ul> - * <li>Data source (see {@link #setDataSource(DataSource)}</li> - * <li>Connection URL (see {@link #setConnectionUrl(String)})</li> - * <li>User name (see {@link #setUser(String)})</li> - * <li>Password (see {@link #setPassword(String)})</li> - * <li>Create table query (see {@link #setConnectionUrl(String)})</li> - * <li>Load entry query (see {@link #setLoadQuery(String)})</li> - * <li>Update entry query (see {@link #setUpdateQuery(String)})</li> - * <li>Insert entry query (see {@link #setInsertQuery(String)})</li> - * <li>Delete entry query (see {@link #setDeleteQuery(String)})</li> - * </ul> - * <h2 class="header">Java Example</h2> - * <pre name="code" class="java"> - * ... - * GridCacheJdbcBlobStore<String, String> store = new GridCacheJdbcBlobStore<String, String>(); - * ... - * </pre> - * <h2 class="header">Spring Example</h2> - * <pre name="code" class="xml"> - * ... - * <bean id="cache.jdbc.store" - * class="org.gridgain.grid.cache.store.jdbc.GridCacheJdbcBlobStore"> - * <property name="connectionUrl" value="jdbc:h2:mem:"/> - * <property name="createTableQuery" - * value="create table if not exists ENTRIES (key other, val other)"/> - * </bean> - * ... - * </pre> - * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> - * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - */ -public class GridCacheJdbcBlobStore<K, V> extends GridCacheStoreAdapter<K, V> { - /** Default connection URL (value is <tt>jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1</tt>). */ - public static final String DFLT_CONN_URL = "jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1"; - - /** - * Default create table query - * (value is <tt>create table if not exists ENTRIES (key other primary key, val other)</tt>). - */ - public static final String DFLT_CREATE_TBL_QRY = "create table if not exists ENTRIES " + - "(key binary primary key, val binary)"; - - /** Default load entry query (value is <tt>select * from ENTRIES where key=?</tt>). */ - public static final String DFLT_LOAD_QRY = "select * from ENTRIES where key=?"; - - /** Default update entry query (value is <tt>select * from ENTRIES where key=?</tt>). */ - public static final String DFLT_UPDATE_QRY = "update ENTRIES set val=? where key=?"; - - /** Default insert entry query (value is <tt>insert into ENTRIES (key, val) values (?, ?)</tt>). */ - public static final String DFLT_INSERT_QRY = "insert into ENTRIES (key, val) values (?, ?)"; - - /** Default delete entry query (value is <tt>delete from ENTRIES where key=?</tt>). */ - public static final String DFLT_DEL_QRY = "delete from ENTRIES where key=?"; - - /** Connection attribute name. */ - private static final String ATTR_CONN = "JDBC_STORE_CONNECTION"; - - /** Connection URL. */ - private String connUrl = DFLT_CONN_URL; - - /** Query to create table. */ - private String createTblQry = DFLT_CREATE_TBL_QRY; - - /** Query to load entry. */ - private String loadQry = DFLT_LOAD_QRY; - - /** Query to update entry. */ - private String updateQry = DFLT_UPDATE_QRY; - - /** Query to insert entries. */ - private String insertQry = DFLT_INSERT_QRY; - - /** Query to delete entries. */ - private String delQry = DFLT_DEL_QRY; - - /** User name for database access. */ - private String user; - - /** Password for database access. */ - @GridToStringExclude - private String passwd; - - /** Data source. */ - private DataSource dataSrc; - - /** Flag for schema initialization. */ - private boolean initSchema = true; - - /** Log. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Marshaller. */ - @IgniteMarshallerResource - private IgniteMarshaller marsh; - - /** Init guard. */ - @GridToStringExclude - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Init latch. */ - @GridToStringExclude - private final CountDownLatch initLatch = new CountDownLatch(1); - - /** Opened connections. */ - @GridToStringExclude - private final LongAdder opened = new LongAdder(); - - /** Closed connections. */ - @GridToStringExclude - private final LongAdder closed = new LongAdder(); - - /** Test mode flag. */ - @GridToStringExclude - private boolean testMode; - - /** Successful initialization flag. */ - private boolean initOk; - - /** {@inheritDoc} */ - @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { - init(); - - Connection conn = tx.removeMeta(ATTR_CONN); - - if (conn != null) { - try { - if (commit) - conn.commit(); - else - conn.rollback(); - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); - } - finally { - closeConnection(conn); - } - } - - if (log.isDebugEnabled()) - log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"RedundantTypeArguments"}) - @Override public V load(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { - init(); - - if (log.isDebugEnabled()) - log.debug("Store load [key=" + key + ", tx=" + tx + ']'); - - Connection conn = null; - - PreparedStatement stmt = null; - - try { - conn = connection(tx); - - stmt = conn.prepareStatement(loadQry); - - stmt.setObject(1, toBytes(key)); - - ResultSet rs = stmt.executeQuery(); - - if (rs.next()) - return fromBytes(rs.getBytes(2)); - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to load object: " + key, e); - } - finally { - end(tx, conn, stmt); - } - - return null; - } - - /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, K key, V val) throws IgniteCheckedException { - init(); - - if (log.isDebugEnabled()) - log.debug("Store put [key=" + key + ", val=" + val + ", tx=" + tx + ']'); - - Connection conn = null; - - PreparedStatement stmt = null; - - try { - conn = connection(tx); - - stmt = conn.prepareStatement(updateQry); - - stmt.setObject(1, toBytes(val)); - stmt.setObject(2, toBytes(key)); - - if (stmt.executeUpdate() == 0) { - stmt.close(); - - stmt = conn.prepareStatement(insertQry); - - stmt.setObject(1, toBytes(key)); - stmt.setObject(2, toBytes(val)); - - stmt.executeUpdate(); - } - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to put object [key=" + key + ", val=" + val + ']', e); - } - finally { - end(tx, conn, stmt); - } - } - - /** {@inheritDoc} */ - @Override public void remove(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { - init(); - - if (log.isDebugEnabled()) - log.debug("Store remove [key=" + key + ", tx=" + tx + ']'); - - Connection conn = null; - - PreparedStatement stmt = null; - - try { - conn = connection(tx); - - stmt = conn.prepareStatement(delQry); - - stmt.setObject(1, toBytes(key)); - - stmt.executeUpdate(); - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to remove object: " + key, e); - } - finally { - end(tx, conn, stmt); - } - } - - /** - * @param tx Cache transaction. - * @return Connection. - * @throws SQLException In case of error. - */ - private Connection connection(@Nullable IgniteTx tx) throws SQLException { - if (tx != null) { - Connection conn = tx.meta(ATTR_CONN); - - if (conn == null) { - conn = openConnection(false); - - // Store connection in transaction metadata, so it can be accessed - // for other operations on the same transaction. - tx.addMeta(ATTR_CONN, conn); - } - - return conn; - } - // Transaction can be null in case of simple load operation. - else - return openConnection(true); - } - - /** - * Closes allocated resources depending on transaction status. - * - * @param tx Active transaction, if any. - * @param conn Allocated connection. - * @param st Created statement, - */ - private void end(@Nullable IgniteTx tx, Connection conn, Statement st) { - U.closeQuiet(st); - - if (tx == null) - // Close connection right away if there is no transaction. - closeConnection(conn); - } - - /** - * Gets connection from a pool. - * - * @param autocommit {@code true} If connection should use autocommit mode. - * @return Pooled connection. - * @throws SQLException In case of error. - */ - private Connection openConnection(boolean autocommit) throws SQLException { - Connection conn = dataSrc != null ? dataSrc.getConnection() : - DriverManager.getConnection(connUrl, user, passwd); - - if (testMode) - opened.increment(); - - conn.setAutoCommit(autocommit); - - return conn; - } - - /** - * Closes connection. - * - * @param conn Connection to close. - */ - private void closeConnection(Connection conn) { - U.closeQuiet(conn); - - if (testMode) - closed.increment(); - } - - /** - * Initializes store. - * - * @throws IgniteCheckedException If failed to initialize. - */ - private void init() throws IgniteCheckedException { - if (initLatch.getCount() > 0) { - if (initGuard.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Initializing cache store."); - - if (F.isEmpty(connUrl)) - throw new IgniteCheckedException("Failed to initialize cache store (connection URL is not provided)."); - - if (!initSchema) { - initLatch.countDown(); - - return; - } - - if (F.isEmpty(createTblQry)) - throw new IgniteCheckedException("Failed to initialize cache store (create table query is not provided)."); - - Connection conn = null; - - Statement stmt = null; - - try { - conn = openConnection(false); - - stmt = conn.createStatement(); - - stmt.execute(createTblQry); - - conn.commit(); - - initOk = true; - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to create database table.", e); - } - finally { - U.closeQuiet(stmt); - - closeConnection(conn); - - initLatch.countDown(); - } - } - else - U.await(initLatch); - } - - if (!initOk) - throw new IgniteCheckedException("Cache store was not properly initialized."); - } - - /** - * Flag indicating whether DB schema should be initialized by GridGain (default behaviour) or - * was explicitly created by user. - * - * @param initSchema {@code True} if DB schema should be initialized by GridGain (default behaviour), - * {code @false} if schema was explicitly created by user. - */ - public void setInitSchema(boolean initSchema) { - this.initSchema = initSchema; - } - - /** - * Sets connection URL. - * - * @param connUrl Connection URL. - */ - public void setConnectionUrl(String connUrl) { - this.connUrl = connUrl; - } - - /** - * Sets create table query. - * - * @param createTblQry Create table query. - */ - public void setCreateTableQuery(String createTblQry) { - this.createTblQry = createTblQry; - } - - /** - * Sets load query. - * - * @param loadQry Load query - */ - public void setLoadQuery(String loadQry) { - this.loadQry = loadQry; - } - - /** - * Sets update entry query. - * - * @param updateQry Update entry query. - */ - public void setUpdateQuery(String updateQry) { - this.updateQry = updateQry; - } - - /** - * Sets insert entry query. - * - * @param insertQry Insert entry query. - */ - public void setInsertQuery(String insertQry) { - this.insertQry = insertQry; - } - - /** - * Sets delete entry query. - * - * @param delQry Delete entry query. - */ - public void setDeleteQuery(String delQry) { - this.delQry = delQry; - } - - /** - * Sets user name for database access. - * - * @param user User name. - */ - public void setUser(String user) { - this.user = user; - } - - /** - * Sets password for database access. - * - * @param passwd Password. - */ - public void setPassword(String passwd) { - this.passwd = passwd; - } - - /** - * Sets data source. Data source should be fully configured and ready-to-use. - * <p> - * Note that if data source is provided, all connections will be - * acquired via this data source. If data source is not provided, a new connection - * will be created for each store call ({@code connectionUrl}, - * {@code user} and {@code password} parameters will be used). - * - * @param dataSrc Data source. - */ - public void setDataSource(DataSource dataSrc) { - this.dataSrc = dataSrc; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheJdbcBlobStore.class, this, "passwd", passwd != null ? "*" : null); - } - - /** - * Serialize object to byte array using marshaller. - * - * @param obj Object to convert to byte array. - * @return Byte array. - * @throws IgniteCheckedException If failed to convert. - */ - protected byte[] toBytes(Object obj) throws IgniteCheckedException { - return marsh.marshal(obj); - } - - /** - * Deserialize object from byte array using marshaller. - * - * @param bytes Bytes to deserialize. - * @param <X> Result object type. - * @return Deserialized object. - * @throws IgniteCheckedException If failed. - */ - protected <X> X fromBytes(byte[] bytes) throws IgniteCheckedException { - if (bytes == null || bytes.length == 0) - return null; - - return marsh.unmarshal(bytes, getClass().getClassLoader()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/cache/store/jdbc/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/store/jdbc/package.html b/modules/core/src/main/java/org/gridgain/grid/cache/store/jdbc/package.html deleted file mode 100644 index 50755cd..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/store/jdbc/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains reference JDBC-based cache store implementation. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/cache/store/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/store/package.html b/modules/core/src/main/java/org/gridgain/grid/cache/store/package.html deleted file mode 100644 index 8f597d7..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/store/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains cache store interfaces. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheEntryImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheEntryImpl.java new file mode 100644 index 0000000..cae6265 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheEntryImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache; + +import javax.cache.*; + +/** + * + */ +public class CacheEntryImpl<K, V> implements Cache.Entry<K, V> { + /** */ + private final K key; + + /** */ + private final V val; + + /** + * @param key Key. + * @param val Value. + */ + public CacheEntryImpl(K key, V val) { + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return key; + } + + /** {@inheritDoc} */ + @Override public V getValue() { + return val; + } + + /** {@inheritDoc} */ + @Override public <T> T unwrap(Class<T> clazz) { + throw new IllegalArgumentException(); + } + + /** {@inheritDoc} */ + public String toString() { + return "CacheEntry [key=" + key + ", val=" + val + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheLoaderWriterStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheLoaderWriterStore.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheLoaderWriterStore.java index d787c94..69b1ea4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheLoaderWriterStore.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheLoaderWriterStore.java @@ -18,22 +18,19 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.store.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; -import javax.cache.*; +import javax.cache.Cache; import javax.cache.integration.*; import java.util.*; /** * Store implementation wrapping {@link CacheLoader} and {@link CacheWriter}. */ -class GridCacheLoaderWriterStore<K, V> implements GridCacheStore<K, V>, LifecycleAware { +class GridCacheLoaderWriterStore<K, V> implements CacheStore<K, V>, LifecycleAware { /** */ private final CacheLoader<K, V> ldr; @@ -75,7 +72,7 @@ class GridCacheLoaderWriterStore<K, V> implements GridCacheStore<K, V>, Lifecycl } /** {@inheritDoc} */ - @Nullable @Override public V load(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + @Nullable @Override public V load(K key) { if (ldr == null) return null; @@ -83,46 +80,31 @@ class GridCacheLoaderWriterStore<K, V> implements GridCacheStore<K, V>, Lifecycl } /** {@inheritDoc} */ - @Override public void loadAll(@Nullable IgniteTx tx, Collection<? extends K> keys, IgniteBiInClosure<K, V> c) - throws IgniteCheckedException { + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) { if (ldr == null) - return; - - Map<K, V> map = ldr.loadAll(keys); + return Collections.emptyMap(); - if (map != null) { - for (Map.Entry<K, V> e : map.entrySet()) - c.apply(e.getKey(), e.getValue()); - } + return ldr.loadAll(keys); } /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, K key, V val) throws IgniteCheckedException { + @Override public void write(Cache.Entry<? extends K, ? extends V> entry) { if (writer == null) return; - writer.write(new KeyValueEntry<>(key, val)); + writer.write(entry); } /** {@inheritDoc} */ - @Override public void putAll(@Nullable IgniteTx tx, Map<? extends K, ? extends V> map) - throws IgniteCheckedException { + @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) { if (writer == null) return; - Collection<Cache.Entry<? extends K, ? extends V>> col = - F.viewReadOnly(map.entrySet(), new C1<Map.Entry<? extends K, ? extends V>, Cache.Entry<? extends K, ? extends V>>() { - @Override - public Cache.Entry<? extends K, ? extends V> apply(Map.Entry<? extends K, ? extends V> e) { - return new MapEntry<>(e); - } - }); - - writer.writeAll(col); + writer.writeAll(entries); } /** {@inheritDoc} */ - @Override public void remove(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + @Override public void delete(Object key) { if (writer == null) return; @@ -130,7 +112,7 @@ class GridCacheLoaderWriterStore<K, V> implements GridCacheStore<K, V>, Lifecycl } /** {@inheritDoc} */ - @Override public void removeAll(@Nullable IgniteTx tx, Collection<? extends K> keys) throws IgniteCheckedException { + @Override public void deleteAll(Collection<?> keys) { if (writer == null) return; @@ -138,82 +120,7 @@ class GridCacheLoaderWriterStore<K, V> implements GridCacheStore<K, V>, Lifecycl } /** {@inheritDoc} */ - @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { + @Override public void txEnd(boolean commit) { // No-op. } - - /** - * - */ - private static class KeyValueEntry<K, V> implements Cache.Entry<K, V> { - /** */ - private final K key; - - /** */ - private final V val; - - /** - * @param key Key. - * @param val Value. - */ - KeyValueEntry(K key, V val) { - this.key = key; - this.val = val; - } - - /** {@inheritDoc} */ - @Override public K getKey() { - return key; - } - - /** {@inheritDoc} */ - @Override public V getValue() { - return val; - } - - /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> clazz) { - throw new IllegalArgumentException(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(KeyValueEntry.class, this); - } - } - - /** - * - */ - static class MapEntry<K, V> implements Cache.Entry<K, V> { - /** */ - private final Map.Entry<K, V> e; - - /** - * @param e Entry. - */ - MapEntry(Map.Entry<K, V> e) { - this.e = e; - } - - /** {@inheritDoc} */ - @Override public K getKey() { - return e.getKey(); - } - - /** {@inheritDoc} */ - @Override public V getValue() { - return e.getValue(); - } - - /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> clazz) { - throw new IllegalArgumentException(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MapEntry.class, this); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index 7809509..2602510 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -18,6 +18,7 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; @@ -30,7 +31,6 @@ import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.cache.affinity.consistenthash.*; import org.gridgain.grid.cache.affinity.fair.*; import org.gridgain.grid.cache.affinity.rendezvous.*; -import org.gridgain.grid.cache.store.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.*; import org.gridgain.grid.kernal.processors.cache.datastructures.*; @@ -601,7 +601,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class); - GridCacheStore store = cacheStore(ctx.gridName(), cfg); + CacheStore store = cacheStore(ctx.gridName(), cfg); GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, store); @@ -1811,7 +1811,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * or user-defined cache store. */ @SuppressWarnings({"unchecked"}) - private GridCacheStore cacheStore(String gridName, GridCacheConfiguration cfg) { + private CacheStore cacheStore(String gridName, GridCacheConfiguration cfg) { if (cfg.getStore() == null || !cfg.isWriteBehindEnabled()) return cfg.getStore(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java index 49077eb..df309e4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.store.*; import org.gridgain.grid.kernal.processors.cache.dr.*; import org.jetbrains.annotations.*; @@ -193,8 +192,8 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { * <p> * This method will return {@code true} if value is stored in cache and {@code false} otherwise. * <p> - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. + * If write-through is enabled, the stored value will be persisted to {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, Object, Object)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -218,8 +217,8 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { * <p> * This method will return {@code true} if value is stored in cache and {@code false} otherwise. * <p> - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. + * If write-through is enabled, the stored value will be persisted to {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, Object, Object)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -240,8 +239,8 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { /** * Removes given key mapping from cache if one exists and value is equal to the passed in value. * <p> - * If write-through is enabled, the value will be removed from {@link GridCacheStore} - * via {@link GridCacheStore#remove(IgniteTx, Object)} method. + * If write-through is enabled, the value will be removed from {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#remove(IgniteTx, Object)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -264,8 +263,8 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { * This method will return {@code true} if remove did occur, which means that all optionally * provided filters have passed and there was something to remove, {@code false} otherwise. * <p> - * If write-through is enabled, the value will be removed from {@link GridCacheStore} - * via {@link GridCacheStore#remove(IgniteTx, Object)} method. + * If write-through is enabled, the value will be removed from {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#remove(IgniteTx, Object)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java index 469671a..e943a11 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java @@ -23,14 +23,15 @@ import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.*; import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.store.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.interop.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import sun.nio.cs.*; +import javax.cache.*; import java.util.*; /** @@ -41,15 +42,18 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { private static final String SES_ATTR = "STORE_SES"; /** */ - private final GridCacheStore<K, Object> store; + private final CacheStore<K, Object> store; /** */ - private final GridCacheStoreBalancingWrapper<K, Object> singleThreadGate; + private final CacheStoreBalancingWrapper<K, Object> singleThreadGate; /** */ private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); /** */ + private final boolean sesEnabled; + + /** */ private final boolean locStore; /** */ @@ -61,19 +65,24 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @throws IgniteCheckedException In case of error. */ @SuppressWarnings("unchecked") - public GridCacheStoreManager(GridKernalContext ctx, @Nullable GridCacheStore<K, Object> store) + public GridCacheStoreManager(GridKernalContext ctx, @Nullable CacheStore<K, Object> store) throws IgniteCheckedException { this.store = store; - singleThreadGate = store == null ? null : new GridCacheStoreBalancingWrapper<>(store); + singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store); if (store instanceof GridCacheWriteBehindStore) store = ((GridCacheWriteBehindStore)store).store(); - if (store != null) + if (store != null) { ctx.resource().injectBasicResource(store, IgniteCacheSessionResource.class, new ThreadLocalSession()); - locStore = U.hasAnnotation(store, GridCacheLocalStore.class); + sesEnabled = true; // TODO IGNITE-42. + } + else + sesEnabled = false; + + locStore = U.hasAnnotation(store, CacheLocalStore.class); } /** {@inheritDoc} */ @@ -162,7 +171,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { boolean ses = initSession(tx); try { - val = convert(singleThreadGate.load(tx, key)); + val = convert(singleThreadGate.load(key)); } catch (ClassCastException e) { handleClassCastException(e); @@ -236,9 +245,13 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { boolean ses = initSession(tx); try { - singleThreadGate.loadAll(tx, keys0, new CI2<K, Object>() { - @Override public void apply(K k, Object o) { - V v = convert(o); + if (keys.size() > singleThreadGate.loadAllThreshold()) { + Map<K, Object> map = singleThreadGate.loadAll(keys0); + + for (Map.Entry<K, Object> e : map.entrySet()) { + K k = e.getKey(); + + V v = convert(e.getValue()); if (cctx.portableEnabled()) { k = (K)cctx.marshalToPortable(k); @@ -247,7 +260,21 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { vis.apply(k, v); } - }); + } + else { + singleThreadGate.loadAll(keys0, new CI2<K, Object>() { + @Override public void apply(K k, Object o) { + V v = convert(o); + + if (cctx.portableEnabled()) { + k = (K)cctx.marshalToPortable(k); + v = (V)cctx.marshalToPortable(v); + } + + vis.apply(k, v); + } + }); + } } catch (ClassCastException e) { handleClassCastException(e); @@ -256,7 +283,8 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { throw U.cast(e); } finally { - sesHolder.set(null); + if (ses) + sesHolder.set(null); } if (log.isDebugEnabled()) @@ -354,7 +382,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { boolean ses = initSession(tx); try { - store.put(tx, key, locStore ? F.t(val, ver) : val); + store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); } catch (ClassCastException e) { handleClassCastException(e); @@ -414,18 +442,21 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { boolean ses = initSession(tx); try { - store.putAll(tx, locStore ? map0 : F.viewReadOnly(map0, - new C1<IgniteBiTuple<V, GridCacheVersion>, Object>() { - @Override public Object apply(IgniteBiTuple<V, GridCacheVersion> t) { - return t.get1(); + C1<Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>>, Cache.Entry<? extends K, ?>> c = + new C1<Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>>, Cache.Entry<? extends K, ?>>() { + @Override public Cache.Entry<? extends K, ?> apply(Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e) { + return new CacheEntryImpl<>(e.getKey(), locStore ? e.getValue() : e.getValue().get1()); } - })); + }; + + store.writeAll(F.viewReadOnly(map.entrySet(), c)); } catch (ClassCastException e) { handleClassCastException(e); } finally { - sesHolder.set(null); + if (ses) + sesHolder.set(null); } if (log.isDebugEnabled()) @@ -459,7 +490,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { boolean ses = initSession(tx); try { - store.remove(tx, key); + store.delete(key); } catch (ClassCastException e) { handleClassCastException(e); @@ -509,7 +540,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { boolean ses = initSession(tx); try { - store.removeAll(tx, keys0); + store.deleteAll(keys0); } catch (ClassCastException e) { handleClassCastException(e); @@ -531,7 +562,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { /** * @return Store. */ - public GridCacheStore<K, Object> store() { + public CacheStore<K, Object> store() { return store; } @@ -551,9 +582,18 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { assert store != null; - tx.removeMeta(SES_ATTR); + boolean ses = initSession(tx); + + try { + store.txEnd(commit); + } + finally { + if (ses) { + sesHolder.set(null); - store.txEnd(tx, commit); + tx.removeMeta(SES_ATTR); + } + } } /** @@ -577,7 +617,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return {@code True} if */ private boolean initSession(@Nullable IgniteTx tx) { - if (tx == null) + if (!sesEnabled || tx == null) return false; SessionData ses = tx.meta(SES_ATTR); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java index 9cdae01..fea67a9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java @@ -18,13 +18,13 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.*; import org.apache.ignite.thread.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.store.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.interop.*; import org.gridgain.grid.util.typedef.*; @@ -34,13 +34,14 @@ import org.gridgain.grid.util.worker.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.integration.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; /** - * Internal wrapper for a {@link GridCacheStore} that enables write-behind logic. + * Internal wrapper for a {@link org.apache.ignite.cache.store.CacheStore} that enables write-behind logic. * <p/> * The general purpose of this approach is to reduce cache store load under high * store update rate. The idea is to cache all write and remove operations in a pending @@ -55,7 +56,7 @@ import java.util.concurrent.locks.*; * Since write operations to the cache store are deferred, transaction support is lost; no * transaction objects are passed to the underlying store. */ -public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, LifecycleAware, GridInteropAware { +public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware, GridInteropAware { /** Default write cache initial capacity. */ public static final int DFLT_INITIAL_CAPACITY = 1024; @@ -93,7 +94,7 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li private String cacheName; /** Underlying store. */ - private GridCacheStore<K, V> store; + private CacheStore<K, V> store; /** Write cache. */ private ConcurrentLinkedHashMap<K, StatefulValue<V>> writeCache; @@ -241,7 +242,7 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li * @param log Grid logger. * @param store {@code GridCacheStore} that need to be wrapped. */ - public GridCacheWriteBehindStore(String gridName, String cacheName, IgniteLogger log, GridCacheStore<K, V> store) { + public GridCacheWriteBehindStore(String gridName, String cacheName, IgniteLogger log, CacheStore<K, V> store) { this.gridName = gridName; this.cacheName = cacheName; this.log = log; @@ -251,7 +252,7 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li /** * @return Underlying store. */ - public GridCacheStore<K, V> store() { + public CacheStore<K, V> store() { return store; } @@ -377,19 +378,16 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li * * @param clo {@inheritDoc} * @param args {@inheritDoc} - * @throws IgniteCheckedException {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) - throws IgniteCheckedException { + @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) { store.loadCache(clo, args); } /** {@inheritDoc} */ @SuppressWarnings({"NullableProblems"}) - @Override public void loadAll(@Nullable IgniteTx tx, - @Nullable Collection<? extends K> keys, IgniteBiInClosure<K, V> c) throws IgniteCheckedException { + @Override public void loadAll(@Nullable Collection<? extends K> keys, IgniteBiInClosure<K, V> c) { if (log.isDebugEnabled()) - log.debug("Store load all [keys=" + keys + ", tx=" + tx + ']'); + log.debug("Store load all [keys=" + keys + ']'); Collection<K> remaining = new LinkedList<>(); @@ -425,13 +423,13 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li // For items that were not found in queue. if (!remaining.isEmpty()) - store.loadAll(null, remaining, c); + store.loadAll(remaining, c); } /** {@inheritDoc} */ - @Override public V load(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + @Override public V load(K key) { if (log.isDebugEnabled()) - log.debug("Store load [key=" + key + ", tx=" + tx + ']'); + log.debug("Store load [key=" + key + ']'); StatefulValue<V> val = writeCache.get(key); @@ -455,41 +453,49 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li } } - return store.load(null, key); + return store.load(key); } /** {@inheritDoc} */ - @Override public void putAll(@Nullable IgniteTx tx, @Nullable Map<? extends K, ? extends V> map) - throws IgniteCheckedException { + @Override public void putAll(Map<? extends K, ? extends V> map) { for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) - put(tx, e.getKey(), e.getValue()); + put(e.getKey(), e.getValue()); } /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, K key, V val) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Store put [key=" + key + ", val=" + val + ", tx=" + tx + ']'); + @Override public void put(K key, V val) { + try { + if (log.isDebugEnabled()) + log.debug("Store put [key=" + key + ", val=" + val + ']'); - updateCache(key, val, StoreOperation.PUT); + updateCache(key, val, StoreOperation.PUT); + } + catch (GridInterruptedException e) { + throw new CacheWriterException(e); + } } /** {@inheritDoc} */ - @Override public void removeAll(@Nullable IgniteTx tx, @Nullable Collection<? extends K> keys) - throws IgniteCheckedException { + @Override public void removeAll(Collection<? extends K> keys) { for (K key : keys) - remove(tx, key); + remove(key); } /** {@inheritDoc} */ - @Override public void remove(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Store remove [key=" + key + ", tx=" + tx + ']'); + @Override public void remove(K key) { + try { + if (log.isDebugEnabled()) + log.debug("Store remove [key=" + key + ']'); - updateCache(key, null, StoreOperation.RMV); + updateCache(key, null, StoreOperation.RMV); + } + catch (GridInterruptedException e) { + throw new CacheWriterException(e); + } } /** {@inheritDoc} */ - @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { + @Override public void txEnd(boolean commit) { // No-op. } @@ -678,12 +684,12 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li try { switch (operation) { case PUT: - store.putAll(null, vals); + store.putAll(vals); break; case RMV: - store.removeAll(null, vals.keySet()); + store.removeAll(vals.keySet()); break; @@ -693,7 +699,7 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li return true; } - catch (IgniteCheckedException e) { + catch (Exception e) { LT.warn(log, e, "Unable to update underlying store: " + store); if (writeCache.sizex() > cacheCriticalSize || stopping.get()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java new file mode 100644 index 0000000..1c60452 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreMultithreadedSelfTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.lang.reflect.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*; +import static org.gridgain.testframework.GridTestUtils.*; + +/** + * + */ +public class GridCacheJdbcBlobStoreMultithreadedSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of grids to start. */ + private static final int GRID_CNT = 5; + + /** Number of transactions. */ + private static final int TX_CNT = 1000; + + /** Cache store. */ + private static CacheStore<Integer, String> store; + + /** Distribution mode. */ + private GridCacheDistributionMode mode; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + store = store(); + + mode = NEAR_PARTITIONED; + + startGridsMultiThreaded(GRID_CNT - 2); + + mode = NEAR_ONLY; + + startGrid(GRID_CNT - 2); + + mode = CLIENT_ONLY; + + startGrid(GRID_CNT - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + GridCacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setSwapEnabled(false); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setBackups(1); + cc.setDistributionMode(mode); + + cc.setStore(store); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedPut() throws Exception { + IgniteFuture<?> fut1 = runMultiThreadedAsync(new Callable<Object>() { + private final Random rnd = new Random(); + + @Override public Object call() throws Exception { + for (int i = 0; i < TX_CNT; i++) { + GridCache<Integer, String> cache = cache(rnd.nextInt(GRID_CNT)); + + cache.put(rnd.nextInt(1000), "value"); + } + + return null; + } + }, 4, "put"); + + IgniteFuture<?> fut2 = runMultiThreadedAsync(new Callable<Object>() { + private final Random rnd = new Random(); + + @Override public Object call() throws Exception { + for (int i = 0; i < TX_CNT; i++) { + GridCache<Integer, String> cache = cache(rnd.nextInt(GRID_CNT)); + + cache.putIfAbsent(rnd.nextInt(1000), "value"); + } + + return null; + } + }, 4, "putIfAbsent"); + + fut1.get(); + fut2.get(); + + long opened = ((LongAdder)U.field(store, "opened")).sum(); + long closed = ((LongAdder)U.field(store, "closed")).sum(); + + assert opened > 0; + assert closed > 0; + + assertEquals(opened, closed); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedPutAll() throws Exception { + runMultiThreaded(new Callable<Object>() { + private final Random rnd = new Random(); + + @Override public Object call() throws Exception { + for (int i = 0; i < TX_CNT; i++) { + Map<Integer, String> map = new TreeMap<>(); + + for (int j = 0; j < 10; j++) + map.put(rnd.nextInt(1000), "value"); + + GridCache<Integer, String> cache = cache(rnd.nextInt(GRID_CNT)); + + cache.putAll(map); + } + + return null; + } + }, 8, "putAll"); + + long opened = ((LongAdder)U.field(store, "opened")).sum(); + long closed = ((LongAdder)U.field(store, "closed")).sum(); + + assert opened > 0; + assert closed > 0; + + assertEquals(opened, closed); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedExplicitTx() throws Exception { + runMultiThreaded(new Callable<Object>() { + private final Random rnd = new Random(); + + @Override public Object call() throws Exception { + for (int i = 0; i < TX_CNT; i++) { + GridCache<Integer, String> cache = cache(rnd.nextInt(GRID_CNT)); + + try (IgniteTx tx = cache.txStart()) { + cache.put(1, "value"); + cache.put(2, "value"); + cache.put(3, "value"); + + cache.get(1); + cache.get(4); + + Map<Integer, String> map = new TreeMap<>(); + + map.put(5, "value"); + map.put(6, "value"); + + cache.putAll(map); + + tx.commit(); + } + } + + return null; + } + }, 8, "tx"); + + long opened = ((LongAdder)U.field(store, "opened")).sum(); + long closed = ((LongAdder)U.field(store, "closed")).sum(); + + assert opened > 0; + assert closed > 0; + + assertEquals(opened, closed); + } + + /** + * @return New store. + * @throws Exception In case of error. + */ + private CacheStore<Integer, String> store() throws Exception { + CacheStore<Integer, String> store = new CacheJdbcBlobStore<>(); + + Field f = store.getClass().getDeclaredField("testMode"); + + f.setAccessible(true); + + f.set(store, true); + + return store; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreSelfTest.java new file mode 100644 index 0000000..3011276 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/GridCacheJdbcBlobStoreSelfTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.gridgain.testframework.junits.cache.*; + +import java.sql.*; + +/** + * Cache store test. + */ +public class GridCacheJdbcBlobStoreSelfTest + extends GridAbstractCacheStoreSelfTest<CacheJdbcBlobStore<Object, Object>> { + /** + * @throws Exception If failed. + */ + public GridCacheJdbcBlobStoreSelfTest() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + try (Connection c = DriverManager.getConnection(CacheJdbcBlobStore.DFLT_CONN_URL, null, null)) { + try (Statement s = c.createStatement()) { + s.executeUpdate("drop table ENTRIES"); + } + } + } + + /** {@inheritDoc} */ + @Override protected CacheJdbcBlobStore<Object, Object> store() { + return new CacheJdbcBlobStore<>(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/package.html b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/package.html new file mode 100644 index 0000000..1f85ff2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java index 04390bc..9caba0b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java @@ -10,6 +10,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; @@ -18,7 +19,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.store.*; import org.gridgain.testframework.junits.common.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -131,7 +131,7 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest { /** * @return Cache store. */ - protected GridCacheStore<?, ?> cacheStore() { + protected CacheStore<?, ?> cacheStore() { return null; } @@ -210,7 +210,7 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest { /** * */ - public class TestStore extends GridCacheStoreAdapter<Object, Object> { + public class TestStore extends CacheStoreAdapter<Object, Object> { /** {@inheritDoc} */ @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { for (Map.Entry<Object, Object> e : storeMap.entrySet()) @@ -218,17 +218,17 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public Object load(IgniteTx tx, Object key) { + @Override public Object load(Object key) { return storeMap.get(key); } /** {@inheritDoc} */ - @Override public void put(IgniteTx tx, Object key, @Nullable Object val) { + @Override public void put(Object key, @Nullable Object val) { storeMap.put(key, val); } /** {@inheritDoc} */ - @Override public void remove(IgniteTx tx, Object key) { + @Override public void remove(Object key) { storeMap.remove(key); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java index 2ff0468..3c775f8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java @@ -9,14 +9,14 @@ package org.apache.ignite.internal.processors.cache; -import org.gridgain.grid.cache.store.*; +import org.apache.ignite.cache.store.*; /** * */ public class IgniteCacheAtomicLocalWithStoreInvokeTest extends IgniteCacheAtomicLocalInvokeTest { /** {@inheritDoc} */ - @Override protected GridCacheStore<?, ?> cacheStore() { + @Override protected CacheStore<?, ?> cacheStore() { return new TestStore(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java index 5fa85e3..9556bcc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java @@ -9,7 +9,7 @@ package org.apache.ignite.internal.processors.cache; -import org.gridgain.grid.cache.store.*; +import org.apache.ignite.cache.store.*; /** * @@ -17,7 +17,7 @@ import org.gridgain.grid.cache.store.*; public class IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest extends IgniteCacheAtomicPrimaryWriteOrderInvokeTest { /** {@inheritDoc} */ - @Override protected GridCacheStore<?, ?> cacheStore() { + @Override protected CacheStore<?, ?> cacheStore() { return new IgniteCacheAbstractTest.TestStore(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderWithStoreExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderWithStoreExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderWithStoreExpiryPolicyTest.java index 5f55aaa..3fcdeaf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderWithStoreExpiryPolicyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderWithStoreExpiryPolicyTest.java @@ -9,7 +9,7 @@ package org.apache.ignite.internal.processors.cache.expiry; -import org.gridgain.grid.cache.store.*; +import org.apache.ignite.cache.store.*; /** * @@ -17,7 +17,7 @@ import org.gridgain.grid.cache.store.*; public class IgniteCacheAtomicPrimaryWriteOrderWithStoreExpiryPolicyTest extends IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest{ /** {@inheritDoc} */ - @Override protected GridCacheStore<?, ?> cacheStore() { + @Override protected CacheStore<?, ?> cacheStore() { return new TestStore(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicWithStoreExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicWithStoreExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicWithStoreExpiryPolicyTest.java index b64215b..b00354b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicWithStoreExpiryPolicyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicWithStoreExpiryPolicyTest.java @@ -9,14 +9,14 @@ package org.apache.ignite.internal.processors.cache.expiry; -import org.gridgain.grid.cache.store.*; +import org.apache.ignite.cache.store.*; /** * */ public class IgniteCacheAtomicWithStoreExpiryPolicyTest extends IgniteCacheAtomicExpiryPolicyTest { /** {@inheritDoc} */ - @Override protected GridCacheStore<?, ?> cacheStore() { + @Override protected CacheStore<?, ?> cacheStore() { return new TestStore(); } }