# ignite-42
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4b8ec5f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4b8ec5f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4b8ec5f2 Branch: refs/heads/ignite-42 Commit: 4b8ec5f230456293084d9f2224bd5bda90520a81 Parents: 806ce6a Author: sboikov <sboi...@gridgain.com> Authored: Thu Jan 15 09:43:58 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jan 15 17:42:04 2015 +0300 ---------------------------------------------------------------------- .../store/dummy/CacheDummyPersonStore.java | 20 +- .../hibernate/CacheHibernatePersonStore.java | 55 +- .../store/jdbc/CacheJdbcPersonStore.java | 50 +- .../org/gridgain/client/GridHashMapStore.java | 16 +- .../integration/GridClientAbstractSelfTest.java | 16 +- .../java/org/apache/ignite/IgniteCache.java | 39 +- .../cache/store/CacheLoadOnlyStoreAdapter.java | 325 +++++++++++ .../ignite/cache/store/CacheLocalStore.java | 31 + .../apache/ignite/cache/store/CacheStore.java | 154 +++++ .../ignite/cache/store/CacheStoreAdapter.java | 116 ++++ .../cache/store/CacheStoreBalancingWrapper.java | 293 ++++++++++ .../cache/store/jdbc/CacheJdbcBlobStore.java | 573 +++++++++++++++++++ .../apache/ignite/cache/store/jdbc/package.html | 24 + .../org/apache/ignite/cache/store/package.html | 23 + .../java/org/gridgain/grid/cache/GridCache.java | 13 +- .../grid/cache/GridCacheConfiguration.java | 16 +- .../grid/cache/GridCacheProjection.java | 143 +++-- .../store/GridCacheLoadOnlyStoreAdapter.java | 328 ----------- .../grid/cache/store/GridCacheLocalStore.java | 31 - .../grid/cache/store/GridCacheStore.java | 220 ------- .../grid/cache/store/GridCacheStoreAdapter.java | 113 ---- .../store/GridCacheStoreBalancingWrapper.java | 278 --------- .../store/jdbc/GridCacheJdbcBlobStore.java | 552 ------------------ .../gridgain/grid/cache/store/jdbc/package.html | 24 - .../org/gridgain/grid/cache/store/package.html | 23 - .../kernal/processors/cache/CacheEntryImpl.java | 60 ++ .../cache/GridCacheLoaderWriterStore.java | 121 +--- .../processors/cache/GridCacheProcessor.java | 6 +- .../processors/cache/GridCacheProjectionEx.java | 17 +- .../processors/cache/GridCacheStoreManager.java | 92 ++- .../cache/GridCacheWriteBehindStore.java | 74 +-- ...CacheJdbcBlobStoreMultithreadedSelfTest.java | 243 ++++++++ .../jdbc/GridCacheJdbcBlobStoreSelfTest.java | 51 ++ .../apache/ignite/cache/store/jdbc/package.html | 23 + .../cache/IgniteCacheAbstractTest.java | 12 +- ...niteCacheAtomicLocalWithStoreInvokeTest.java | 4 +- ...micPrimaryWriteOrderWithStoreInvokeTest.java | 4 +- ...maryWriteOrderWithStoreExpiryPolicyTest.java | 4 +- ...iteCacheAtomicWithStoreExpiryPolicyTest.java | 4 +- .../IgniteCacheTxWithStoreExpiryPolicyTest.java | 4 +- .../store/GridCacheBalancingStoreSelfTest.java | 43 +- .../GridCacheLoadOnlyStoreAdapterSelfTest.java | 6 +- .../cache/store/GridGeneratingTestStore.java | 38 +- ...CacheJdbcBlobStoreMultithreadedSelfTest.java | 243 -------- .../jdbc/GridCacheJdbcBlobStoreSelfTest.java | 51 -- .../gridgain/grid/cache/store/jdbc/package.html | 23 - .../cache/GridCacheAbstractFlagsTest.java | 4 +- .../cache/GridCacheAbstractSelfTest.java | 12 +- .../cache/GridCacheBasicStoreAbstractTest.java | 4 +- ...acheBasicStoreMultithreadedAbstractTest.java | 12 +- ...idCacheConfigurationConsistencySelfTest.java | 31 +- .../cache/GridCacheGenericTestStore.java | 77 +-- .../GridCacheGroupLockAbstractSelfTest.java | 16 +- .../cache/GridCacheLifecycleAwareSelfTest.java | 25 +- ...ridCacheMultinodeUpdateAbstractSelfTest.java | 4 +- .../cache/GridCachePartitionedWritesTest.java | 13 +- .../cache/GridCacheReloadSelfTest.java | 11 +- .../cache/GridCacheStorePutxSelfTest.java | 30 +- .../cache/GridCacheSwapReloadSelfTest.java | 12 +- .../processors/cache/GridCacheTestStore.java | 109 ++-- ...idCacheWriteBehindStoreAbstractSelfTest.java | 13 +- .../GridCacheWriteBehindStoreSelfTest.java | 50 +- .../IgniteTxStoreExceptionAbstractSelfTest.java | 36 +- ...CacheAtomicReferenceApiSelfAbstractTest.java | 4 +- ...chePartitionedReloadAllAbstractSelfTest.java | 12 +- .../dht/GridCacheColocatedDebugTest.java | 6 +- .../dht/GridCacheGlobalLoadTest.java | 15 +- .../near/GridCacheGetStoreErrorSelfTest.java | 15 +- .../near/GridCacheNearMultiNodeSelfTest.java | 11 +- .../near/GridCacheNearOneNodeSelfTest.java | 11 +- .../GridCacheNearPartitionedClearSelfTest.java | 4 +- .../GridCachePartitionedLoadCacheSelfTest.java | 13 +- .../GridCachePartitionedStorePutSelfTest.java | 12 +- .../near/GridPartitionedBackupLoadSelfTest.java | 14 +- .../GridCacheBatchEvictUnswapSelfTest.java | 13 +- .../GridCacheEmptyEntriesAbstractSelfTest.java | 15 +- .../GridCacheEvictionTouchSelfTest.java | 11 +- ...dCacheAtomicLocalMetricsNoStoreSelfTest.java | 4 +- .../local/GridCacheLocalLoadAllSelfTest.java | 15 +- ...ridCacheContinuousQueryAbstractSelfTest.java | 15 +- .../GridCacheWriteBehindStoreLoadTest.java | 11 +- .../colocation/GridTestCacheStore.java | 22 +- .../swap/GridSwapEvictAllBenchmark.java | 13 +- .../testframework/junits/GridTestResources.java | 7 + .../cache/GridAbstractCacheStoreSelfTest.java | 174 +++--- .../junits/cache/TestCacheSession.java | 59 ++ .../cache/TestThreadLocalCacheSession.java | 57 ++ .../bamboo/GridDataGridTestSuite.java | 2 +- .../hibernate/GridCacheHibernateBlobStore.java | 67 ++- .../GridCacheHibernateBlobStoreSelfTest.java | 6 +- .../cache/GridCacheAbstractQuerySelfTest.java | 12 +- .../cache/GridCacheQueryLoadSelfTest.java | 16 +- 92 files changed, 2931 insertions(+), 2763 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java b/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java index 48390ac..f4b6553 100644 --- a/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java +++ b/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java @@ -18,12 +18,12 @@ package org.gridgain.examples.datagrid.store.dummy; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.gridgain.examples.datagrid.store.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.store.*; import org.jetbrains.annotations.*; import java.util.*; @@ -32,41 +32,47 @@ import java.util.concurrent.*; /** * Dummy cache store implementation. */ -public class CacheDummyPersonStore extends GridCacheStoreAdapter<Long, Person> { +public class CacheDummyPersonStore extends CacheStoreAdapter<Long, Person> { /** Auto-inject grid instance. */ @IgniteInstanceResource private Ignite ignite; /** Auto-inject cache name. */ - @GridCacheName + @IgniteCacheNameResource private String cacheName; /** Dummy database. */ private Map<Long, Person> dummyDB = new ConcurrentHashMap<>(); /** {@inheritDoc} */ - @Override public Person load(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { + @Override public Person load(Long key) { + IgniteTx tx = transaction(); + System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); return dummyDB.get(key); } /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, Long key, Person val) throws IgniteCheckedException { + @Override public void put(Long key, Person val) { + IgniteTx tx = transaction(); + System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']'); dummyDB.put(key, val); } /** {@inheritDoc} */ - @Override public void remove(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { + @Override public void remove(Long key) { + IgniteTx tx = transaction(); + System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); dummyDB.remove(key); } /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) throws IgniteCheckedException { + @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) { int cnt = (Integer)args[0]; System.out.println(">>> Store loadCache for entry count: " + cnt); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java b/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java index cc0bbc1..c671108 100644 --- a/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java +++ b/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java @@ -17,22 +17,22 @@ package org.gridgain.examples.datagrid.store.hibernate; -import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.examples.datagrid.store.*; -import org.gridgain.grid.cache.store.*; import org.hibernate.*; import org.hibernate.cfg.*; import org.jetbrains.annotations.*; +import javax.cache.integration.*; import java.util.*; /** - * Example of {@link GridCacheStore} implementation that uses Hibernate + * Example of {@link CacheStore} implementation that uses Hibernate * and deals with maps {@link UUID} to {@link Person}. */ -public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Person> { +public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> { /** Default hibernate configuration resource path. */ private static final String DFLT_HIBERNATE_CFG = "/org/gridgain/examples/datagrid/store/hibernate/hibernate.cfg.xml"; @@ -50,7 +50,9 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso } /** {@inheritDoc} */ - @Override public Person load(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { + @Override public Person load(Long key) { + IgniteTx tx = transaction(); + System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); Session ses = session(tx); @@ -61,7 +63,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso catch (HibernateException e) { rollback(ses, tx); - throw new IgniteCheckedException("Failed to load value from cache store with key: " + key, e); + throw new CacheLoaderException("Failed to load value from cache store with key: " + key, e); } finally { end(ses, tx); @@ -69,12 +71,13 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso } /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, Long key, @Nullable Person val) - throws IgniteCheckedException { + @Override public void put(Long key, @Nullable Person val) { + IgniteTx tx = transaction(); + System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']'); if (val == null) { - remove(tx, key); + remove(key); return; } @@ -87,7 +90,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso catch (HibernateException e) { rollback(ses, tx); - throw new IgniteCheckedException("Failed to put value to cache store [key=" + key + ", val" + val + "]", e); + throw new CacheWriterException("Failed to put value to cache store [key=" + key + ", val" + val + "]", e); } finally { end(ses, tx); @@ -96,7 +99,9 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso /** {@inheritDoc} */ @SuppressWarnings({"JpaQueryApiInspection"}) - @Override public void remove(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { + @Override public void remove(Long key) { + IgniteTx tx = transaction(); + System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); Session ses = session(tx); @@ -108,7 +113,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso catch (HibernateException e) { rollback(ses, tx); - throw new IgniteCheckedException("Failed to remove value from cache store with key: " + key, e); + throw new CacheWriterException("Failed to remove value from cache store with key: " + key, e); } finally { end(ses, tx); @@ -116,9 +121,9 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso } /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) throws IgniteCheckedException { + @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) { if (args == null || args.length == 0 || args[0] == null) - throw new IgniteCheckedException("Expected entry count parameter is not provided."); + throw new CacheLoaderException("Expected entry count parameter is not provided."); final int entryCnt = (Integer)args[0]; @@ -144,7 +149,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso System.out.println(">>> Loaded " + cnt + " values into cache."); } catch (HibernateException e) { - throw new IgniteCheckedException("Failed to load values from cache store.", e); + throw new CacheLoaderException("Failed to load values from cache store.", e); } finally { end(ses, null); @@ -188,8 +193,14 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso } /** {@inheritDoc} */ - @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { - Session ses = tx.removeMeta(ATTR_SES); + @Override public void txEnd(boolean commit) { + CacheStoreSession storeSes = session(); + + IgniteTx tx = storeSes.transaction(); + + Map<Object, Object> props = storeSes.properties(); + + Session ses = (Session)props.remove(ATTR_SES); if (ses != null) { Transaction hTx = ses.getTransaction(); @@ -207,7 +218,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso System.out.println("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); } catch (HibernateException e) { - throw new IgniteCheckedException("Failed to end transaction [xid=" + tx.xid() + + throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); } finally { @@ -227,16 +238,18 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso Session ses; if (tx != null) { - ses = tx.meta(ATTR_SES); + Map<Object, Object> props = session().properties(); + + ses = (Session)props.get(ATTR_SES); if (ses == null) { ses = sesFactory.openSession(); ses.beginTransaction(); - // Store session in transaction metadata, so it can be accessed + // Store session in session properties, so it can be accessed // for other operations on the same transaction. - tx.addMeta(ATTR_SES, ses); + props.put(ATTR_SES, ses); System.out.println("Hibernate session open [ses=" + ses + ", tx=" + tx.xid() + "]"); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java index 2aeb655..2711dba 100644 --- a/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java +++ b/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java @@ -18,21 +18,22 @@ package org.gridgain.examples.datagrid.store.jdbc; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.examples.datagrid.store.*; -import org.gridgain.grid.cache.store.*; import org.jetbrains.annotations.*; +import javax.cache.integration.*; import java.sql.*; import java.util.*; /** - * Example of {@link GridCacheStore} implementation that uses JDBC + * Example of {@link CacheStore} implementation that uses JDBC * transaction with cache transactions and maps {@link UUID} to {@link Person}. * */ -public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { +public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { /** Transaction metadata attribute name. */ private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION"; @@ -64,8 +65,12 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { } /** {@inheritDoc} */ - @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { - try (Connection conn = tx.removeMeta(ATTR_NAME)) { + @Override public void txEnd(boolean commit) { + IgniteTx tx = transaction(); + + Map<Object, Object> props = session().properties(); + + try (Connection conn = (Connection)props.remove(ATTR_NAME)) { if (conn != null) { if (commit) conn.commit(); @@ -76,12 +81,14 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { System.out.println(">>> Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); } catch (SQLException e) { - throw new IgniteCheckedException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); + throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); } } /** {@inheritDoc} */ - @Nullable @Override public Person load(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { + @Nullable @Override public Person load(Long key) { + IgniteTx tx = transaction(); + System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); Connection conn = null; @@ -99,7 +106,7 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { } } catch (SQLException e) { - throw new IgniteCheckedException("Failed to load object: " + key, e); + throw new CacheLoaderException("Failed to load object: " + key, e); } finally { end(tx, conn); @@ -109,8 +116,9 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { } /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, Long key, Person val) - throws IgniteCheckedException { + @Override public void put(Long key, Person val) { + IgniteTx tx = transaction(); + System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']'); Connection conn = null; @@ -142,7 +150,7 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { } } catch (SQLException e) { - throw new IgniteCheckedException("Failed to put object [key=" + key + ", val=" + val + ']', e); + throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e); } finally { end(tx, conn); @@ -150,7 +158,9 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { } /** {@inheritDoc} */ - @Override public void remove(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { + @Override public void remove(Long key) { + IgniteTx tx = transaction(); + System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); Connection conn = null; @@ -165,7 +175,7 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { } } catch (SQLException e) { - throw new IgniteCheckedException("Failed to remove object: " + key, e); + throw new CacheLoaderException("Failed to remove object: " + key, e); } finally { end(tx, conn); @@ -173,9 +183,9 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { } /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) throws IgniteCheckedException { + @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) { if (args == null || args.length == 0 || args[0] == null) - throw new IgniteCheckedException("Expected entry count parameter is not provided."); + throw new CacheLoaderException("Expected entry count parameter is not provided."); final int entryCnt = (Integer)args[0]; @@ -201,7 +211,7 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { } } catch (SQLException e) { - throw new IgniteCheckedException("Failed to load values from cache store.", e); + throw new CacheLoaderException("Failed to load values from cache store.", e); } finally { end(null, conn); @@ -215,14 +225,16 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { */ private Connection connection(@Nullable IgniteTx tx) throws SQLException { if (tx != null) { - Connection conn = tx.meta(ATTR_NAME); + Map<Object, Object> props = session().properties(); + + Connection conn = (Connection)props.get(ATTR_NAME); if (conn == null) { conn = openConnection(false); - // Store connection in transaction metadata, so it can be accessed + // Store connection in session properties, so it can be accessed // for other operations on the same transaction. - tx.addMeta(ATTR_NAME, conn); + props.put(ATTR_NAME, conn); } return conn; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java b/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java index 67fc50d..5b0fbe7 100644 --- a/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java +++ b/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java @@ -18,9 +18,9 @@ package org.gridgain.client; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.store.*; import org.jetbrains.annotations.*; import java.util.*; @@ -28,32 +28,28 @@ import java.util.*; /** * Simple HashMap based cache store emulation. */ -public class GridHashMapStore extends GridCacheStoreAdapter { +public class GridHashMapStore extends CacheStoreAdapter { /** Map for cache store. */ private final Map<Object, Object> map = new HashMap<>(); /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure c, Object... args) - throws IgniteCheckedException { + @Override public void loadCache(IgniteBiInClosure c, Object... args) { for (Map.Entry e : map.entrySet()) c.apply(e.getKey(), e.getValue()); } /** {@inheritDoc} */ - @Override public Object load(@Nullable IgniteTx tx, Object key) - throws IgniteCheckedException { + @Override public Object load(Object key) { return map.get(key); } /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, Object key, - @Nullable Object val) throws IgniteCheckedException { + @Override public void put(Object key, @Nullable Object val) { map.put(key, val); } /** {@inheritDoc} */ - @Override public void remove(@Nullable IgniteTx tx, Object key) - throws IgniteCheckedException { + @Override public void remove(Object key) { map.remove(key); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java index 58d6894..6e8d1a3 100644 --- a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java @@ -20,6 +20,7 @@ package org.gridgain.client.integration; import junit.framework.*; import net.sf.json.*; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; @@ -32,7 +33,6 @@ import org.gridgain.client.*; import org.gridgain.client.ssl.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.affinity.consistenthash.*; -import org.gridgain.grid.cache.store.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.junits.common.*; @@ -1564,33 +1564,29 @@ public abstract class GridClientAbstractSelfTest extends GridCommonAbstractTest /** * Simple HashMap based cache store emulation. */ - private static class HashMapStore extends GridCacheStoreAdapter<Object, Object> { + private static class HashMapStore extends CacheStoreAdapter<Object, Object> { /** Map for cache store. */ private final Map<Object, Object> map = new HashMap<>(); /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) - throws IgniteCheckedException { + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { for (Map.Entry e : map.entrySet()) { clo.apply(e.getKey(), e.getValue()); } } /** {@inheritDoc} */ - @Override public Object load(@Nullable IgniteTx tx, Object key) - throws IgniteCheckedException { + @Override public Object load(Object key) { return map.get(key); } /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, Object key, - @Nullable Object val) throws IgniteCheckedException { + @Override public void put(Object key, @Nullable Object val) { map.put(key, val); } /** {@inheritDoc} */ - @Override public void remove(@Nullable IgniteTx tx, Object key) - throws IgniteCheckedException { + @Override public void remove(Object key) { map.remove(key); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 2988005..30850e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -22,7 +22,6 @@ import org.apache.ignite.cache.query.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.store.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -95,20 +94,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @param p Optional predicate (may be {@code null}). If provided, will be used to * filter values to be put into cache. * @param args Optional user arguments to be passed into - * {@link GridCacheStore#loadCache(IgniteBiInClosure, Object...)} method. + * {@link org.apache.ignite.cache.store.CacheStore#loadCache(IgniteBiInClosure, Object...)} method. * @throws CacheException If loading failed. */ public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException; /** - * Delegates to {@link GridCacheStore#loadCache(IgniteBiInClosure,Object...)} method + * Delegates to {@link org.apache.ignite.cache.store.CacheStore#loadCache(IgniteBiInClosure,Object...)} method * to load state from the underlying persistent storage. The loaded values * will then be given to the optionally passed in predicate, and, if the predicate returns * {@code true}, will be stored in cache. If predicate is {@code null}, then * all loaded values will be stored in cache. * <p> * Note that this method does not receive keys as a parameter, so it is up to - * {@link GridCacheStore} implementation to provide all the data to be loaded. + * {@link org.apache.ignite.cache.store.CacheStore} implementation to provide all the data to be loaded. * <p> * This method is not transactional and may end up loading a stale value into * cache if another thread has updated the value immediately after it has been @@ -118,7 +117,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @param p Optional predicate (may be {@code null}). If provided, will be used to * filter values to be put into cache. * @param args Optional user arguments to be passed into - * {@link GridCacheStore#loadCache(IgniteBiInClosure, Object...)} method. + * {@link org.apache.ignite.cache.store.CacheStore#loadCache(IgniteBiInClosure, Object...)} method. * @throws CacheException If loading failed. */ public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException; @@ -130,14 +129,14 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * the value will be loaded from the primary node, which in its turn may load the value * from the swap storage, and consecutively, if it's not in swap, * from the underlying persistent storage. If value has to be loaded from persistent - * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be used. + * storage, {@link org.apache.ignite.cache.store.CacheStore#load(IgniteTx, Object)} method will be used. * <p> * If the returned value is not needed, method {@link #putIfAbsent(Object, Object)} should * always be used instead of this one to avoid the overhead associated with returning of the * previous value. * <p> - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. + * If write-through is enabled, the stored value will be persisted to {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, Object, Object)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -164,8 +163,8 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * are acquired in undefined order, so it may cause a deadlock when used with * other concurrent transactional updates. * <p> - * If write-through is enabled, the values will be removed from {@link GridCacheStore} - * via {@link GridCacheStore#removeAll(IgniteTx, java.util.Collection)} method. + * If write-through is enabled, the values will be removed from {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#removeAll(IgniteTx, java.util.Collection)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -319,13 +318,13 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * the value will be loaded from the primary node, which in its turn may load the value * from the swap storage, and consecutively, if it's not in swap, * from the underlying persistent storage. If value has to be loaded from persistent - * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be used. + * storage, {@link org.apache.ignite.cache.store.CacheStore#load(IgniteTx, Object)} method will be used. * <p> * If the returned value is not needed, method {@link #putIf(Object, Object, IgnitePredicate)} should * always be used instead of this one to avoid the overhead associated with returning of the previous value. * <p> - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. + * If write-through is enabled, the stored value will be persisted to {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, Object, Object)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -356,8 +355,8 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * value and, therefore, does not have any overhead associated with returning a value. It * should be used whenever return value is not required. * <p> - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. + * If write-through is enabled, the stored value will be persisted to {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, Object, Object)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -384,14 +383,14 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * caches, the value will be loaded from the primary node, which in its turn may load the value * from the disk-based swap storage, and consecutively, if it's not in swap, * from the underlying persistent storage. If value has to be loaded from persistent - * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be used. + * storage, {@link org.apache.ignite.cache.store.CacheStore#load(IgniteTx, Object)} method will be used. * <p> * If the returned value is not needed, method {@link #removeIf(Object, IgnitePredicate)} should * always be used instead of this one to avoid the overhead associated with returning of the * previous value. * <p> - * If write-through is enabled, the value will be removed from {@link GridCacheStore} - * via {@link GridCacheStore#remove(IgniteTx, Object)} method. + * If write-through is enabled, the value will be removed from {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#remove(IgniteTx, Object)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -416,8 +415,8 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * This method will return {@code true} if remove did occur, which means that all optionally * provided filters have passed and there was something to remove, {@code false} otherwise. * <p> - * If write-through is enabled, the value will be removed from {@link GridCacheStore} - * via {@link GridCacheStore#remove(IgniteTx, Object)} method. + * If write-through is enabled, the value will be removed from {@link org.apache.ignite.cache.store.CacheStore} + * via {@link org.apache.ignite.cache.store.CacheStore#remove(IgniteTx, Object)} method. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java new file mode 100644 index 0000000..0d7a85b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.integration.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; + +/** + * This adepter designed to support stores with bulk loading from stream-like source. + * <p> + * This class processes input data in the following way: + * <ul> + * <li> + * Iterator of input record obtained from user-defined {@link #inputIterator(Object...)}. + * </li> + * <li> + * Iterator continuously queried for input records and they are grouped into batches of {@link #batchSize}. + * </li> + * <li> + * Batch is placed into processing queue and puled by one of {@link #threadsCnt} working threads. + * </li> + * <li> + * Each record in batch is passed to user-defined {@link #parse(Object, Object...)} method + * and result is stored into cache. + * </li> + * </ul> + * <p> + * Two methods should be implemented by inheritants: + * <ul> + * <li> + * {@link #inputIterator(Object...)}. It should open underlying data source + * and iterate all record available in it. Individual records could be in very raw form, + * like text lines for CSV files. + * </li> + * <li> + * {@link #parse(Object, Object...)}. This method should process input records + * and transform them into key-value pairs for cache. + * </li> + * </ul> + * <p> + * + * @param <K> Key type. + * @param <V> Value type. + * @param <I> Input type. + */ +public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K, V> { + /** + * Default batch size (number of records read with {@link #inputIterator(Object...)} + * and then submitted to internal pool at a time). + */ + public static final int DFLT_BATCH_SIZE = 100; + + /** Default batch queue size (max batches count to limit memory usage). */ + public static final int DFLT_BATCH_QUEUE_SIZE = 100; + + /** Default number of working threads (equal to the number of available processors). */ + public static final int DFLT_THREADS_COUNT = Runtime.getRuntime().availableProcessors(); + + /** Auto-injected logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Batch size. */ + private int batchSize = DFLT_BATCH_SIZE; + + /** Size of queue of batches to process. */ + private int batchQueueSize = DFLT_BATCH_QUEUE_SIZE; + + /** Number fo working threads. */ + private int threadsCnt = DFLT_THREADS_COUNT; + + /** + * Returns iterator of input records. + * <p> + * Note that returned iterator doesn't have to be thread-safe. Thus it could + * operate on raw streams, DB connections, etc. without additional synchronization. + * + * @param args Arguments passes into {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method. + * @return Iterator over input records. + * @throws CacheLoaderException If iterator can't be created with the given arguments. + */ + protected abstract Iterator<I> inputIterator(@Nullable Object... args) throws CacheLoaderException; + + /** + * This method should transform raw data records into valid key-value pairs + * to be stored into cache. + * <p> + * If {@code null} is returned then this record will be just skipped. + * + * @param rec A raw data record. + * @param args Arguments passed into {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method. + * @return Cache entry to be saved in cache or {@code null} if no entry could be produced from this record. + */ + @Nullable protected abstract IgniteBiTuple<K, V> parse(I rec, @Nullable Object... args); + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<K, V> c, @Nullable Object... args) { + ExecutorService exec = new ThreadPoolExecutor( + threadsCnt, + threadsCnt, + 0L, + MILLISECONDS, + new ArrayBlockingQueue<Runnable>(batchQueueSize), + new BlockingRejectedExecutionHandler()); + + Iterator<I> iter = inputIterator(args); + + Collection<I> buf = new ArrayList<>(batchSize); + + try { + while (iter.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + U.warn(log, "Working thread was interrupted while loading data."); + + break; + } + + buf.add(iter.next()); + + if (buf.size() == batchSize) { + exec.submit(new Worker(c, buf, args)); + + buf = new ArrayList<>(batchSize); + } + } + + if (!buf.isEmpty()) + exec.submit(new Worker(c, buf, args)); + } + catch (RejectedExecutionException ignored) { + // Because of custom RejectedExecutionHandler. + assert false : "RejectedExecutionException was thrown while it shouldn't."; + } + finally { + exec.shutdown(); + + try { + exec.awaitTermination(Long.MAX_VALUE, MILLISECONDS); + } + catch (InterruptedException ignored) { + U.warn(log, "Working thread was interrupted while waiting for put operations to complete."); + + Thread.currentThread().interrupt(); + } + } + } + + /** + * Returns batch size. + * + * @return Batch size. + */ + public int getBatchSize() { + return batchSize; + } + + /** + * Sets batch size. + * + * @param batchSize Batch size. + */ + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + /** + * Returns batch queue size. + * + * @return Batch queue size. + */ + public int getBatchQueueSize() { + return batchQueueSize; + } + + /** + * Sets batch queue size. + * + * @param batchQueueSize Batch queue size. + */ + public void setBatchQueueSize(int batchQueueSize) { + this.batchQueueSize = batchQueueSize; + } + + /** + * Returns number of worker threads. + * + * @return Number of worker threads. + */ + public int getThreadsCount() { + return threadsCnt; + } + + /** + * Sets number of worker threads. + * + * @param threadsCnt Number of worker threads. + */ + public void setThreadsCount(int threadsCnt) { + this.threadsCnt = threadsCnt; + } + + /** {@inheritDoc} */ + @Override public V load(K key) { + return null; + } + + /** {@inheritDoc} */ + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends K, ? extends V> entry) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) { + // No-op. + } + + /** + * Worker. + */ + private class Worker implements Runnable { + /** */ + private final IgniteBiInClosure<K, V> c; + + /** */ + private final Collection<I> buf; + + /** */ + private final Object[] args; + + /** + * @param c Closure for loaded entries. + * @param buf Set of input records to process. + * @param args Arguments passed into {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method. + */ + Worker(IgniteBiInClosure<K, V> c, Collection<I> buf, Object[] args) { + this.c = c; + this.buf = buf; + this.args = args; + } + + /** {@inheritDoc} */ + @Override public void run() { + for (I rec : buf) { + IgniteBiTuple<K, V> entry = parse(rec, args); + + if (entry != null) + c.apply(entry.getKey(), entry.getValue()); + } + } + } + + /** + * This handler blocks the caller thread until free space will be available in tasks queue. + * If the executor is shut down than it throws {@link RejectedExecutionException}. + * <p> + * It is save to apply this policy when: + * <ol> + * <li>{@code shutdownNow} is not used on the pool.</li> + * <li>{@code shutdown} is called from the thread where all submissions where performed.</li> + * </ol> + */ + private class BlockingRejectedExecutionHandler implements RejectedExecutionHandler { + /** {@inheritDoc} */ + @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + if (executor.isShutdown()) + throw new RejectedExecutionException(); + else + executor.getQueue().put(r); + } + catch (InterruptedException ignored) { + U.warn(log, "Working thread was interrupted while loading data."); + + Thread.currentThread().interrupt(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java new file mode 100644 index 0000000..6fdec5a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import java.lang.annotation.*; + +/** + * Annotation for local {@link CacheStore} implementation. "Local" here means that there is no global + * database behind the grid but each node has an independent one. + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface CacheLocalStore { + // No-op. +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java new file mode 100644 index 0000000..4cdfe5a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.IgnitePortables; +import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.*; +import org.gridgain.grid.cache.*; +import org.jetbrains.annotations.*; + +import javax.cache.integration.*; +import java.sql.*; +import java.util.*; +import java.util.Date; + +import static javax.cache.Cache.*; + +/** + * API for cache persistent storage for read-through and write-through behavior. + * Persistent store is configured via {@link GridCacheConfiguration#getStore()} + * configuration property. If not provided, values will be only kept in cache memory + * or swap storage without ever being persisted to a persistent storage. + * <p> + * {@link CacheStoreAdapter} provides default implementation for bulk operations, + * such as {@link #loadAll(Iterable)}, + * {@link #writeAll(Collection)}, and {@link #deleteAll(Collection)} + * by sequentially calling corresponding {@link #load(Object)}, + * {@link #write(Entry)}, and {@link #delete(Object)} + * operations. Use this adapter whenever such behaviour is acceptable. However + * in many cases it maybe more preferable to take advantage of database batch update + * functionality, and therefore default adapter implementation may not be the best option. + * <p> + * Provided implementations may be used for test purposes: + * <ul> + * <li>{@gglink org.gridgain.grid.cache.store.hibernate.GridCacheHibernateBlobStore}</li> + * <li>{@link CacheJdbcBlobStore}</li> + * </ul> + * <p> + * All transactional operations of this API are provided with ongoing {@link IgniteTx}, + * if any. As transaction is {@link GridMetadataAware}, you can attach any metadata to + * it, e.g. to recognize if several operations belong to the same transaction or not. + * Here is an example of how attach a JDBC connection as transaction metadata: + * <pre name="code" class="java"> + * Connection conn = tx.meta("some.name"); + * + * if (conn == null) { + * conn = ...; // Get JDBC connection. + * + * // Store connection in transaction metadata, so it can be accessed + * // for other operations on the same transaction. + * tx.addMeta("some.name", conn); + * } + * </pre> + * <h1 class="header">Working With Portable Objects</h1> + * When portables are enabled for cache by setting {@link GridCacheConfiguration#isPortableEnabled()} to + * {@code true}), all portable keys and values are converted to instances of {@link PortableObject}. + * Therefore, all cache store methods will take parameters in portable format. To avoid class + * cast exceptions, store must have signature compatible with portables. E.g., if you use {@link Integer} + * as a key and {@code Value} class as a value (which will be converted to portable format), cache store + * signature should be the following: + * <pre name="code" class="java"> + * public class PortableCacheStore implements GridCacheStore<Integer, GridPortableObject> { + * public void put(@Nullable GridCacheTx tx, Integer key, GridPortableObject val) throws IgniteCheckedException { + * ... + * } + * + * ... + * } + * </pre> + * This behavior can be overridden by setting {@link GridCacheConfiguration#setKeepPortableInStore(boolean)} + * flag value to {@code false}. In this case, GridGain will deserialize keys and values stored in portable + * format before they are passed to cache store, so that you can use the following cache store signature instead: + * <pre name="code" class="java"> + * public class ObjectsCacheStore implements GridCacheStore<Integer, Person> { + * public void put(@Nullable GridCacheTx tx, Integer key, Person val) throws GridException { + * ... + * } + * + * ... + * } + * </pre> + * Note that while this can simplify store implementation in some cases, it will cause performance degradation + * due to additional serializations and deserializations of portable objects. You will also need to have key + * and value classes on all nodes since portables will be deserialized when store is invoked. + * <p> + * Note that only portable classes are converted to {@link PortableObject} format. Following + * types are stored in cache without changes and therefore should not affect cache store signature: + * <ul> + * <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li> + * <li>Arrays of primitives (byte[], int[], ...)</li> + * <li>{@link String} and array of {@link String}s</li> + * <li>{@link UUID} and array of {@link UUID}s</li> + * <li>{@link Date} and array of {@link Date}s</li> + * <li>{@link Timestamp} and array of {@link Timestamp}s</li> + * <li>Enums and array of enums</li> + * <li> + * Maps, collections and array of objects (but objects inside + * them will still be converted if they are portable) + * </li> + * </ul> + * + * @see IgnitePortables + */ +public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> { + /** + * Loads all values from underlying persistent storage. Note that keys are not + * passed, so it is up to implementation to figure out what to load. This method + * is called whenever {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} + * method is invoked which is usually to preload the cache from persistent storage. + * <p> + * This method is optional, and cache implementation does not depend on this + * method to do anything. Default implementation of this method in + * {@link CacheStoreAdapter} does nothing. + * <p> + * For every loaded value method {@link org.apache.ignite.lang.IgniteBiInClosure#apply(Object, Object)} + * should be called on the passed in closure. The closure will then make sure + * that the loaded value is stored in cache. + * + * @param clo Closure for loaded values. + * @param args Arguments passes into + * {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method. + * @throws CacheLoaderException If loading failed. + */ + public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws CacheLoaderException; + + /** + * Tells store to commit or rollback a transaction depending on the value of the {@code 'commit'} + * parameter. + * + * @param commit {@code True} if transaction should commit, {@code false} for rollback. + * @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases + * may bring cache transaction into {@link IgniteTxState#UNKNOWN} which will + * consequently cause all transacted entries to be invalidated. + */ + public void txEnd(boolean commit) throws CacheWriterException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java new file mode 100644 index 0000000..4ec5cbf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import java.util.*; + +/** + * Cache storage convenience adapter. It provides default implementation for bulk operations, such + * as {@link #loadAll(Iterable)}, + * {@link #writeAll(Collection)}, and {@link #deleteAll(Collection)} + * by sequentially calling corresponding {@link #load(Object)}, + * {@link #write(Cache.Entry)}, and {@link #delete(Object)} + * operations. Use this adapter whenever such behaviour is acceptable. However in many cases + * it maybe more preferable to take advantage of database batch update functionality, and therefore + * default adapter implementation may not be the best option. + * <p> + * Note that method {@link #loadCache(org.apache.ignite.lang.IgniteBiInClosure, Object...)} has empty + * implementation because it is essentially up to the user to invoke it with + * specific arguments. + */ +public abstract class CacheStoreAdapter<K, V> implements CacheStore<K, V> { + /** */ + @IgniteCacheSessionResource + private CacheStoreSession ses; + + /** + * Default empty implementation. This method needs to be overridden only if + * {@link GridCache#loadCache(IgniteBiPredicate, long, Object...)} method + * is explicitly called. + * + * @param clo {@inheritDoc} + * @param args {@inheritDoc} + */ + @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) { + assert keys != null; + + Map<K, V> loaded = new HashMap<>(); + + for (K key : keys) { + V v = load(key); + + if (v != null) + loaded.put(key, v); + } + + return loaded; + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) { + assert entries != null; + + for (Cache.Entry<? extends K, ? extends V> e : entries) + write(e); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) { + assert keys != null; + + for (Object key : keys) + delete(key); + } + + /** + * Default empty implementation for ending transactions. Note that if explicit cache + * transactions are not used, then transactions do not have to be explicitly ended - + * for all other cases this method should be overridden with custom commit/rollback logic. + * + * @param commit {@inheritDoc} + */ + @Override public void txEnd(boolean commit) { + // No-op. + } + + /** + * @return Current session. + */ + @Nullable protected CacheStoreSession session() { + return ses; + } + + /** + * @return Current transaction. + */ + @Nullable protected IgniteTx transaction() { + return ses != null ? ses.transaction() : null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java new file mode 100644 index 0000000..c76d4ec --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.util.future.*; +import org.gridgain.grid.util.typedef.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.integration.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Cache store wrapper that ensures that there will be no more that one thread loading value from underlying store. + */ +public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> { + /** */ + public static final int DFLT_LOAD_ALL_THRESHOLD = 5; + + /** Delegate store. */ + private CacheStore<K, V> delegate; + + /** Pending cache store loads. */ + private ConcurrentMap<K, LoadFuture> pendingLoads = new ConcurrentHashMap8<>(); + + /** Load all threshold. */ + private int loadAllThreshold = DFLT_LOAD_ALL_THRESHOLD; + + /** + * @param delegate Delegate store. + */ + public CacheStoreBalancingWrapper(CacheStore<K, V> delegate) { + this.delegate = delegate; + } + + /** + * @param delegate Delegate store. + * @param loadAllThreshold Load all threshold. + */ + public CacheStoreBalancingWrapper(CacheStore<K, V> delegate, int loadAllThreshold) { + this.delegate = delegate; + this.loadAllThreshold = loadAllThreshold; + } + + /** + * @return Load all threshold. + */ + public int loadAllThreshold() { + return loadAllThreshold; + } + + /** {@inheritDoc} */ + @Nullable @Override public V load(K key) { + LoadFuture fut = pendingLoads.get(key); + + try { + if (fut != null) + return fut.get(key); + + fut = new LoadFuture(); + + LoadFuture old = pendingLoads.putIfAbsent(key, fut); + + if (old != null) + return old.get(key); + } + catch (IgniteCheckedException e) { + throw new CacheLoaderException(e); + } + + try { + V val = delegate.load(key); + + fut.onComplete(key, val); + + return val; + } + catch (Throwable e) { + fut.onError(key, e); + + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) { + delegate.loadCache(clo, args); + } + + /** {@inheritDoc} */ + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException { + return delegate.loadAll(keys); + } + + /** + * @param keys Keys to load. + * @param c Closure for loaded values. + */ + public void loadAll(Collection<? extends K> keys, final IgniteBiInClosure<K, V> c) { + assert keys.size() < loadAllThreshold; + + Collection<K> needLoad = null; + Map<K, LoadFuture> pending = null; + LoadFuture span = null; + + for (K key : keys) { + LoadFuture fut = pendingLoads.get(key); + + if (fut != null) { + if (pending == null) + pending = new HashMap<>(); + + pending.put(key, fut); + } + else { + // Try to concurrently add pending future. + if (span == null) + span = new LoadFuture(); + + LoadFuture old = pendingLoads.putIfAbsent(key, span); + + if (old != null) { + if (pending == null) + pending = new HashMap<>(); + + pending.put(key, old); + } + else { + if (needLoad == null) + needLoad = new ArrayList<>(keys.size()); + + needLoad.add(key); + } + } + } + + if (needLoad != null) { + assert !needLoad.isEmpty(); + assert span != null; + + try { + Map<K, V> loaded = delegate.loadAll(needLoad); + + for (Map.Entry<K, V> e : loaded.entrySet()) + c.apply(e.getKey(), e.getValue()); + + span.onComplete(needLoad, loaded); + } + catch (Throwable e) { + span.onError(needLoad, e); + + throw e; + } + } + + if (pending != null) { + try { + for (Map.Entry<K, LoadFuture> e : pending.entrySet()) { + K key = e.getKey(); + + c.apply(key, e.getValue().get(key)); + } + } + catch (IgniteCheckedException e) { + throw new CacheLoaderException(e); + } + } + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends K, ? extends V> entry) { + delegate.write(entry); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) { + delegate.writeAll(entries); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + delegate.delete(key); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { + delegate.deleteAll(keys); + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) { + delegate.txEnd(commit); + } + + /** + * + */ + private class LoadFuture extends GridFutureAdapter<Map<K, V>> { + /** */ + private static final long serialVersionUID = 0L; + + /** Collection of keys for pending cleanup. */ + private volatile Collection<K> keys; + + /** + * + */ + public LoadFuture() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Map<K, V> res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + assert keys != null; + + for (K key : keys) + pendingLoads.remove(key, this); + + return true; + } + + return false; + } + + /** + * @param key Key. + * @param val Loaded value. + */ + public void onComplete(K key, V val) { + onComplete(Collections.singletonList(key), F.asMap(key, val)); + } + + /** + * @param keys Keys. + * @param res Loaded values. + */ + public void onComplete(Collection<K> keys, Map<K, V> res) { + this.keys = keys; + + onDone(res); + } + + /** + * @param key Key. + * @param err Error. + */ + public void onError(K key, Throwable err) { + + } + + /** + * @param keys Keys. + * @param err Error. + */ + public void onError(Collection<K> keys, Throwable err) { + this.keys = keys; + + onDone(err); + } + + /** + * Gets value loaded for key k. + * + * @param key Key to load. + * @return Loaded value (possibly {@code null}). + * @throws IgniteCheckedException If load failed. + */ + public V get(K key) throws IgniteCheckedException { + return get().get(key); + } + } +}