# ignite-42

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4b8ec5f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4b8ec5f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4b8ec5f2

Branch: refs/heads/ignite-42
Commit: 4b8ec5f230456293084d9f2224bd5bda90520a81
Parents: 806ce6a
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Jan 15 09:43:58 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Jan 15 17:42:04 2015 +0300

----------------------------------------------------------------------
 .../store/dummy/CacheDummyPersonStore.java      |  20 +-
 .../hibernate/CacheHibernatePersonStore.java    |  55 +-
 .../store/jdbc/CacheJdbcPersonStore.java        |  50 +-
 .../org/gridgain/client/GridHashMapStore.java   |  16 +-
 .../integration/GridClientAbstractSelfTest.java |  16 +-
 .../java/org/apache/ignite/IgniteCache.java     |  39 +-
 .../cache/store/CacheLoadOnlyStoreAdapter.java  | 325 +++++++++++
 .../ignite/cache/store/CacheLocalStore.java     |  31 +
 .../apache/ignite/cache/store/CacheStore.java   | 154 +++++
 .../ignite/cache/store/CacheStoreAdapter.java   | 116 ++++
 .../cache/store/CacheStoreBalancingWrapper.java | 293 ++++++++++
 .../cache/store/jdbc/CacheJdbcBlobStore.java    | 573 +++++++++++++++++++
 .../apache/ignite/cache/store/jdbc/package.html |  24 +
 .../org/apache/ignite/cache/store/package.html  |  23 +
 .../java/org/gridgain/grid/cache/GridCache.java |  13 +-
 .../grid/cache/GridCacheConfiguration.java      |  16 +-
 .../grid/cache/GridCacheProjection.java         | 143 +++--
 .../store/GridCacheLoadOnlyStoreAdapter.java    | 328 -----------
 .../grid/cache/store/GridCacheLocalStore.java   |  31 -
 .../grid/cache/store/GridCacheStore.java        | 220 -------
 .../grid/cache/store/GridCacheStoreAdapter.java | 113 ----
 .../store/GridCacheStoreBalancingWrapper.java   | 278 ---------
 .../store/jdbc/GridCacheJdbcBlobStore.java      | 552 ------------------
 .../gridgain/grid/cache/store/jdbc/package.html |  24 -
 .../org/gridgain/grid/cache/store/package.html  |  23 -
 .../kernal/processors/cache/CacheEntryImpl.java |  60 ++
 .../cache/GridCacheLoaderWriterStore.java       | 121 +---
 .../processors/cache/GridCacheProcessor.java    |   6 +-
 .../processors/cache/GridCacheProjectionEx.java |  17 +-
 .../processors/cache/GridCacheStoreManager.java |  92 ++-
 .../cache/GridCacheWriteBehindStore.java        |  74 +--
 ...CacheJdbcBlobStoreMultithreadedSelfTest.java | 243 ++++++++
 .../jdbc/GridCacheJdbcBlobStoreSelfTest.java    |  51 ++
 .../apache/ignite/cache/store/jdbc/package.html |  23 +
 .../cache/IgniteCacheAbstractTest.java          |  12 +-
 ...niteCacheAtomicLocalWithStoreInvokeTest.java |   4 +-
 ...micPrimaryWriteOrderWithStoreInvokeTest.java |   4 +-
 ...maryWriteOrderWithStoreExpiryPolicyTest.java |   4 +-
 ...iteCacheAtomicWithStoreExpiryPolicyTest.java |   4 +-
 .../IgniteCacheTxWithStoreExpiryPolicyTest.java |   4 +-
 .../store/GridCacheBalancingStoreSelfTest.java  |  43 +-
 .../GridCacheLoadOnlyStoreAdapterSelfTest.java  |   6 +-
 .../cache/store/GridGeneratingTestStore.java    |  38 +-
 ...CacheJdbcBlobStoreMultithreadedSelfTest.java | 243 --------
 .../jdbc/GridCacheJdbcBlobStoreSelfTest.java    |  51 --
 .../gridgain/grid/cache/store/jdbc/package.html |  23 -
 .../cache/GridCacheAbstractFlagsTest.java       |   4 +-
 .../cache/GridCacheAbstractSelfTest.java        |  12 +-
 .../cache/GridCacheBasicStoreAbstractTest.java  |   4 +-
 ...acheBasicStoreMultithreadedAbstractTest.java |  12 +-
 ...idCacheConfigurationConsistencySelfTest.java |  31 +-
 .../cache/GridCacheGenericTestStore.java        |  77 +--
 .../GridCacheGroupLockAbstractSelfTest.java     |  16 +-
 .../cache/GridCacheLifecycleAwareSelfTest.java  |  25 +-
 ...ridCacheMultinodeUpdateAbstractSelfTest.java |   4 +-
 .../cache/GridCachePartitionedWritesTest.java   |  13 +-
 .../cache/GridCacheReloadSelfTest.java          |  11 +-
 .../cache/GridCacheStorePutxSelfTest.java       |  30 +-
 .../cache/GridCacheSwapReloadSelfTest.java      |  12 +-
 .../processors/cache/GridCacheTestStore.java    | 109 ++--
 ...idCacheWriteBehindStoreAbstractSelfTest.java |  13 +-
 .../GridCacheWriteBehindStoreSelfTest.java      |  50 +-
 .../IgniteTxStoreExceptionAbstractSelfTest.java |  36 +-
 ...CacheAtomicReferenceApiSelfAbstractTest.java |   4 +-
 ...chePartitionedReloadAllAbstractSelfTest.java |  12 +-
 .../dht/GridCacheColocatedDebugTest.java        |   6 +-
 .../dht/GridCacheGlobalLoadTest.java            |  15 +-
 .../near/GridCacheGetStoreErrorSelfTest.java    |  15 +-
 .../near/GridCacheNearMultiNodeSelfTest.java    |  11 +-
 .../near/GridCacheNearOneNodeSelfTest.java      |  11 +-
 .../GridCacheNearPartitionedClearSelfTest.java  |   4 +-
 .../GridCachePartitionedLoadCacheSelfTest.java  |  13 +-
 .../GridCachePartitionedStorePutSelfTest.java   |  12 +-
 .../near/GridPartitionedBackupLoadSelfTest.java |  14 +-
 .../GridCacheBatchEvictUnswapSelfTest.java      |  13 +-
 .../GridCacheEmptyEntriesAbstractSelfTest.java  |  15 +-
 .../GridCacheEvictionTouchSelfTest.java         |  11 +-
 ...dCacheAtomicLocalMetricsNoStoreSelfTest.java |   4 +-
 .../local/GridCacheLocalLoadAllSelfTest.java    |  15 +-
 ...ridCacheContinuousQueryAbstractSelfTest.java |  15 +-
 .../GridCacheWriteBehindStoreLoadTest.java      |  11 +-
 .../colocation/GridTestCacheStore.java          |  22 +-
 .../swap/GridSwapEvictAllBenchmark.java         |  13 +-
 .../testframework/junits/GridTestResources.java |   7 +
 .../cache/GridAbstractCacheStoreSelfTest.java   | 174 +++---
 .../junits/cache/TestCacheSession.java          |  59 ++
 .../cache/TestThreadLocalCacheSession.java      |  57 ++
 .../bamboo/GridDataGridTestSuite.java           |   2 +-
 .../hibernate/GridCacheHibernateBlobStore.java  |  67 ++-
 .../GridCacheHibernateBlobStoreSelfTest.java    |   6 +-
 .../cache/GridCacheAbstractQuerySelfTest.java   |  12 +-
 .../cache/GridCacheQueryLoadSelfTest.java       |  16 +-
 92 files changed, 2931 insertions(+), 2763 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java
 
b/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java
index 48390ac..f4b6553 100644
--- 
a/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java
+++ 
b/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java
@@ -18,12 +18,12 @@
 package org.gridgain.examples.datagrid.store.dummy;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.examples.datagrid.store.*;
 import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.store.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -32,41 +32,47 @@ import java.util.concurrent.*;
 /**
  * Dummy cache store implementation.
  */
-public class CacheDummyPersonStore extends GridCacheStoreAdapter<Long, Person> 
{
+public class CacheDummyPersonStore extends CacheStoreAdapter<Long, Person> {
     /** Auto-inject grid instance. */
     @IgniteInstanceResource
     private Ignite ignite;
 
     /** Auto-inject cache name. */
-    @GridCacheName
+    @IgniteCacheNameResource
     private String cacheName;
 
     /** Dummy database. */
     private Map<Long, Person> dummyDB = new ConcurrentHashMap<>();
 
     /** {@inheritDoc} */
-    @Override public Person load(@Nullable IgniteTx tx, Long key) throws 
IgniteCheckedException {
+    @Override public Person load(Long key) {
+        IgniteTx tx = transaction();
+
         System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == 
null ? null : tx.xid()) + ']');
 
         return dummyDB.get(key);
     }
 
     /** {@inheritDoc} */
-    @Override public void put(@Nullable IgniteTx tx, Long key, Person val) 
throws IgniteCheckedException {
+    @Override public void put(Long key, Person val) {
+        IgniteTx tx = transaction();
+
         System.out.println(">>> Store put [key=" + key + ", val=" + val + ", 
xid=" + (tx == null ? null : tx.xid()) + ']');
 
         dummyDB.put(key, val);
     }
 
     /** {@inheritDoc} */
-    @Override public void remove(@Nullable IgniteTx tx, Long key) throws 
IgniteCheckedException {
+    @Override public void remove(Long key) {
+        IgniteTx tx = transaction();
+
         System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == 
null ? null : tx.xid()) + ']');
 
         dummyDB.remove(key);
     }
 
     /** {@inheritDoc} */
-    @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, 
Object... args) throws IgniteCheckedException {
+    @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, 
Object... args) {
         int cnt = (Integer)args[0];
 
         System.out.println(">>> Store loadCache for entry count: " + cnt);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
 
b/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
index cc0bbc1..c671108 100644
--- 
a/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
+++ 
b/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
@@ -17,22 +17,22 @@
 
 package org.gridgain.examples.datagrid.store.hibernate;
 
-import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.examples.datagrid.store.*;
-import org.gridgain.grid.cache.store.*;
 import org.hibernate.*;
 import org.hibernate.cfg.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.integration.*;
 import java.util.*;
 
 /**
- * Example of {@link GridCacheStore} implementation that uses Hibernate
+ * Example of {@link CacheStore} implementation that uses Hibernate
  * and deals with maps {@link UUID} to {@link Person}.
  */
-public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, 
Person> {
+public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> 
{
     /** Default hibernate configuration resource path. */
     private static final String DFLT_HIBERNATE_CFG = 
"/org/gridgain/examples/datagrid/store/hibernate/hibernate.cfg.xml";
 
@@ -50,7 +50,9 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
     }
 
     /** {@inheritDoc} */
-    @Override public Person load(@Nullable IgniteTx tx, Long key) throws 
IgniteCheckedException {
+    @Override public Person load(Long key) {
+        IgniteTx tx = transaction();
+
         System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == 
null ? null : tx.xid()) + ']');
 
         Session ses = session(tx);
@@ -61,7 +63,7 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
         catch (HibernateException e) {
             rollback(ses, tx);
 
-            throw new IgniteCheckedException("Failed to load value from cache 
store with key: " + key, e);
+            throw new CacheLoaderException("Failed to load value from cache 
store with key: " + key, e);
         }
         finally {
             end(ses, tx);
@@ -69,12 +71,13 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
     }
 
     /** {@inheritDoc} */
-    @Override public void put(@Nullable IgniteTx tx, Long key, @Nullable 
Person val)
-        throws IgniteCheckedException {
+    @Override public void put(Long key, @Nullable Person val) {
+        IgniteTx tx = transaction();
+
         System.out.println(">>> Store put [key=" + key + ", val=" + val + ", 
xid=" + (tx == null ? null : tx.xid()) + ']');
 
         if (val == null) {
-            remove(tx, key);
+            remove(key);
 
             return;
         }
@@ -87,7 +90,7 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
         catch (HibernateException e) {
             rollback(ses, tx);
 
-            throw new IgniteCheckedException("Failed to put value to cache 
store [key=" + key + ", val" + val + "]", e);
+            throw new CacheWriterException("Failed to put value to cache store 
[key=" + key + ", val" + val + "]", e);
         }
         finally {
             end(ses, tx);
@@ -96,7 +99,9 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
 
     /** {@inheritDoc} */
     @SuppressWarnings({"JpaQueryApiInspection"})
-    @Override public void remove(@Nullable IgniteTx tx, Long key) throws 
IgniteCheckedException {
+    @Override public void remove(Long key) {
+        IgniteTx tx = transaction();
+
         System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == 
null ? null : tx.xid()) + ']');
 
         Session ses = session(tx);
@@ -108,7 +113,7 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
         catch (HibernateException e) {
             rollback(ses, tx);
 
-            throw new IgniteCheckedException("Failed to remove value from 
cache store with key: " + key, e);
+            throw new CacheWriterException("Failed to remove value from cache 
store with key: " + key, e);
         }
         finally {
             end(ses, tx);
@@ -116,9 +121,9 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
     }
 
     /** {@inheritDoc} */
-    @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, 
Object... args) throws IgniteCheckedException {
+    @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, 
Object... args) {
         if (args == null || args.length == 0 || args[0] == null)
-            throw new IgniteCheckedException("Expected entry count parameter 
is not provided.");
+            throw new CacheLoaderException("Expected entry count parameter is 
not provided.");
 
         final int entryCnt = (Integer)args[0];
 
@@ -144,7 +149,7 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
             System.out.println(">>> Loaded " + cnt + " values into cache.");
         }
         catch (HibernateException e) {
-            throw new IgniteCheckedException("Failed to load values from cache 
store.", e);
+            throw new CacheLoaderException("Failed to load values from cache 
store.", e);
         }
         finally {
             end(ses, null);
@@ -188,8 +193,14 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
     }
 
     /** {@inheritDoc} */
-    @Override public void txEnd(IgniteTx tx, boolean commit) throws 
IgniteCheckedException {
-        Session ses = tx.removeMeta(ATTR_SES);
+    @Override public void txEnd(boolean commit) {
+        CacheStoreSession storeSes = session();
+
+        IgniteTx tx = storeSes.transaction();
+
+        Map<Object, Object> props = storeSes.properties();
+
+        Session ses = (Session)props.remove(ATTR_SES);
 
         if (ses != null) {
             Transaction hTx = ses.getTransaction();
@@ -207,7 +218,7 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
                     System.out.println("Transaction ended [xid=" + tx.xid() + 
", commit=" + commit + ']');
                 }
                 catch (HibernateException e) {
-                    throw new IgniteCheckedException("Failed to end 
transaction [xid=" + tx.xid() +
+                    throw new CacheWriterException("Failed to end transaction 
[xid=" + tx.xid() +
                         ", commit=" + commit + ']', e);
                 }
                 finally {
@@ -227,16 +238,18 @@ public class CacheHibernatePersonStore extends 
GridCacheStoreAdapter<Long, Perso
         Session ses;
 
         if (tx != null) {
-            ses = tx.meta(ATTR_SES);
+            Map<Object, Object> props = session().properties();
+
+            ses = (Session)props.get(ATTR_SES);
 
             if (ses == null) {
                 ses = sesFactory.openSession();
 
                 ses.beginTransaction();
 
-                // Store session in transaction metadata, so it can be accessed
+                // Store session in session properties, so it can be accessed
                 // for other operations on the same transaction.
-                tx.addMeta(ATTR_SES, ses);
+                props.put(ATTR_SES, ses);
 
                 System.out.println("Hibernate session open [ses=" + ses + ", 
tx=" + tx.xid() + "]");
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
 
b/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
index 2aeb655..2711dba 100644
--- 
a/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
+++ 
b/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
@@ -18,21 +18,22 @@
 package org.gridgain.examples.datagrid.store.jdbc;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.examples.datagrid.store.*;
-import org.gridgain.grid.cache.store.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.integration.*;
 import java.sql.*;
 import java.util.*;
 
 /**
- * Example of {@link GridCacheStore} implementation that uses JDBC
+ * Example of {@link CacheStore} implementation that uses JDBC
  * transaction with cache transactions and maps {@link UUID} to {@link Person}.
  *
  */
-public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
+public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
     /** Transaction metadata attribute name. */
     private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION";
 
@@ -64,8 +65,12 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
     }
 
     /** {@inheritDoc} */
-    @Override public void txEnd(IgniteTx tx, boolean commit) throws 
IgniteCheckedException {
-        try (Connection conn = tx.removeMeta(ATTR_NAME)) {
+    @Override public void txEnd(boolean commit) {
+        IgniteTx tx = transaction();
+
+        Map<Object, Object> props = session().properties();
+
+        try (Connection conn = (Connection)props.remove(ATTR_NAME)) {
             if (conn != null) {
                 if (commit)
                     conn.commit();
@@ -76,12 +81,14 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
             System.out.println(">>> Transaction ended [xid=" + tx.xid() + ", 
commit=" + commit + ']');
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to end transaction [xid=" 
+ tx.xid() + ", commit=" + commit + ']', e);
+            throw new CacheWriterException("Failed to end transaction [xid=" + 
tx.xid() + ", commit=" + commit + ']', e);
         }
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Person load(@Nullable IgniteTx tx, Long key) 
throws IgniteCheckedException {
+    @Nullable @Override public Person load(Long key) {
+        IgniteTx tx = transaction();
+
         System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == 
null ? null : tx.xid()) + ']');
 
         Connection conn = null;
@@ -99,7 +106,7 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
             }
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to load object: " + key, 
e);
+            throw new CacheLoaderException("Failed to load object: " + key, e);
         }
         finally {
             end(tx, conn);
@@ -109,8 +116,9 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
     }
 
     /** {@inheritDoc} */
-    @Override public void put(@Nullable IgniteTx tx, Long key, Person val)
-        throws IgniteCheckedException {
+    @Override public void put(Long key, Person val) {
+        IgniteTx tx = transaction();
+
         System.out.println(">>> Store put [key=" + key + ", val=" + val + ", 
xid=" + (tx == null ? null : tx.xid()) + ']');
 
         Connection conn = null;
@@ -142,7 +150,7 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
         }
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to put object [key=" + 
key + ", val=" + val + ']', e);
+            throw new CacheLoaderException("Failed to put object [key=" + key 
+ ", val=" + val + ']', e);
         }
         finally {
             end(tx, conn);
@@ -150,7 +158,9 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
     }
 
     /** {@inheritDoc} */
-    @Override public void remove(@Nullable IgniteTx tx, Long key) throws 
IgniteCheckedException {
+    @Override public void remove(Long key) {
+        IgniteTx tx = transaction();
+
         System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == 
null ? null : tx.xid()) + ']');
 
         Connection conn = null;
@@ -165,7 +175,7 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
             }
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to remove object: " + 
key, e);
+            throw new CacheLoaderException("Failed to remove object: " + key, 
e);
         }
         finally {
             end(tx, conn);
@@ -173,9 +183,9 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
     }
 
     /** {@inheritDoc} */
-    @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, 
Object... args) throws IgniteCheckedException {
+    @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, 
Object... args) {
         if (args == null || args.length == 0 || args[0] == null)
-            throw new IgniteCheckedException("Expected entry count parameter 
is not provided.");
+            throw new CacheLoaderException("Expected entry count parameter is 
not provided.");
 
         final int entryCnt = (Integer)args[0];
 
@@ -201,7 +211,7 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
             }
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to load values from cache 
store.", e);
+            throw new CacheLoaderException("Failed to load values from cache 
store.", e);
         }
         finally {
             end(null, conn);
@@ -215,14 +225,16 @@ public class CacheJdbcPersonStore extends 
GridCacheStoreAdapter<Long, Person> {
      */
     private Connection connection(@Nullable IgniteTx tx) throws SQLException  {
         if (tx != null) {
-            Connection conn = tx.meta(ATTR_NAME);
+            Map<Object, Object> props = session().properties();
+
+            Connection conn = (Connection)props.get(ATTR_NAME);
 
             if (conn == null) {
                 conn = openConnection(false);
 
-                // Store connection in transaction metadata, so it can be 
accessed
+                // Store connection in session properties, so it can be 
accessed
                 // for other operations on the same transaction.
-                tx.addMeta(ATTR_NAME, conn);
+                props.put(ATTR_NAME, conn);
             }
 
             return conn;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java 
b/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java
index 67fc50d..5b0fbe7 100644
--- a/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java
+++ b/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java
@@ -18,9 +18,9 @@
 package org.gridgain.client;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
-import org.gridgain.grid.cache.store.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -28,32 +28,28 @@ import java.util.*;
 /**
  * Simple HashMap based cache store emulation.
  */
-public class GridHashMapStore extends GridCacheStoreAdapter {
+public class GridHashMapStore extends CacheStoreAdapter {
     /** Map for cache store. */
     private final Map<Object, Object> map = new HashMap<>();
 
     /** {@inheritDoc} */
-    @Override public void loadCache(IgniteBiInClosure c, Object... args)
-        throws IgniteCheckedException {
+    @Override public void loadCache(IgniteBiInClosure c, Object... args) {
         for (Map.Entry e : map.entrySet())
             c.apply(e.getKey(), e.getValue());
     }
 
     /** {@inheritDoc} */
-    @Override public Object load(@Nullable IgniteTx tx, Object key)
-        throws IgniteCheckedException {
+    @Override public Object load(Object key) {
         return map.get(key);
     }
 
     /** {@inheritDoc} */
-    @Override public void put(@Nullable IgniteTx tx, Object key,
-        @Nullable Object val) throws IgniteCheckedException {
+    @Override public void put(Object key, @Nullable Object val) {
         map.put(key, val);
     }
 
     /** {@inheritDoc} */
-    @Override public void remove(@Nullable IgniteTx tx, Object key)
-        throws IgniteCheckedException {
+    @Override public void remove(Object key) {
         map.remove(key);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
 
b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
index 58d6894..6e8d1a3 100644
--- 
a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
+++ 
b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
@@ -20,6 +20,7 @@ package org.gridgain.client.integration;
 import junit.framework.*;
 import net.sf.json.*;
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
@@ -32,7 +33,6 @@ import org.gridgain.client.*;
 import org.gridgain.client.ssl.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.affinity.consistenthash.*;
-import org.gridgain.grid.cache.store.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.junits.common.*;
@@ -1564,33 +1564,29 @@ public abstract class GridClientAbstractSelfTest 
extends GridCommonAbstractTest
     /**
      * Simple HashMap based cache store emulation.
      */
-    private static class HashMapStore extends GridCacheStoreAdapter<Object, 
Object> {
+    private static class HashMapStore extends CacheStoreAdapter<Object, 
Object> {
         /** Map for cache store. */
         private final Map<Object, Object> map = new HashMap<>();
 
         /** {@inheritDoc} */
-        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, 
Object... args)
-            throws IgniteCheckedException {
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, 
Object... args) {
             for (Map.Entry e : map.entrySet()) {
                 clo.apply(e.getKey(), e.getValue());
             }
         }
 
         /** {@inheritDoc} */
-        @Override public Object load(@Nullable IgniteTx tx, Object key)
-            throws IgniteCheckedException {
+        @Override public Object load(Object key) {
             return map.get(key);
         }
 
         /** {@inheritDoc} */
-        @Override public void put(@Nullable IgniteTx tx, Object key,
-            @Nullable Object val) throws IgniteCheckedException {
+        @Override public void put(Object key, @Nullable Object val) {
             map.put(key, val);
         }
 
         /** {@inheritDoc} */
-        @Override public void remove(@Nullable IgniteTx tx, Object key)
-            throws IgniteCheckedException {
+        @Override public void remove(Object key) {
             map.remove(key);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 2988005..30850e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cache.query.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.store.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
@@ -95,20 +94,20 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
      * @param p Optional predicate (may be {@code null}). If provided, will be 
used to
      *      filter values to be put into cache.
      * @param args Optional user arguments to be passed into
-     *      {@link GridCacheStore#loadCache(IgniteBiInClosure, Object...)} 
method.
+     *      {@link 
org.apache.ignite.cache.store.CacheStore#loadCache(IgniteBiInClosure, 
Object...)} method.
      * @throws CacheException If loading failed.
      */
     public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable 
Object... args) throws CacheException;
 
     /**
-     * Delegates to {@link 
GridCacheStore#loadCache(IgniteBiInClosure,Object...)} method
+     * Delegates to {@link 
org.apache.ignite.cache.store.CacheStore#loadCache(IgniteBiInClosure,Object...)}
 method
      * to load state from the underlying persistent storage. The loaded values
      * will then be given to the optionally passed in predicate, and, if the 
predicate returns
      * {@code true}, will be stored in cache. If predicate is {@code null}, 
then
      * all loaded values will be stored in cache.
      * <p>
      * Note that this method does not receive keys as a parameter, so it is up 
to
-     * {@link GridCacheStore} implementation to provide all the data to be 
loaded.
+     * {@link org.apache.ignite.cache.store.CacheStore} implementation to 
provide all the data to be loaded.
      * <p>
      * This method is not transactional and may end up loading a stale value 
into
      * cache if another thread has updated the value immediately after it has 
been
@@ -118,7 +117,7 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
      * @param p Optional predicate (may be {@code null}). If provided, will be 
used to
      *      filter values to be put into cache.
      * @param args Optional user arguments to be passed into
-     *      {@link GridCacheStore#loadCache(IgniteBiInClosure, Object...)} 
method.
+     *      {@link 
org.apache.ignite.cache.store.CacheStore#loadCache(IgniteBiInClosure, 
Object...)} method.
      * @throws CacheException If loading failed.
      */
     public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable 
Object... args) throws CacheException;
@@ -130,14 +129,14 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
      * the value will be loaded from the primary node, which in its turn may 
load the value
      * from the swap storage, and consecutively, if it's not in swap,
      * from the underlying persistent storage. If value has to be loaded from 
persistent
-     * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be 
used.
+     * storage, {@link org.apache.ignite.cache.store.CacheStore#load(IgniteTx, 
Object)} method will be used.
      * <p>
      * If the returned value is not needed, method {@link #putIfAbsent(Object, 
Object)} should
      * always be used instead of this one to avoid the overhead associated 
with returning of the
      * previous value.
      * <p>
-     * If write-through is enabled, the stored value will be persisted to 
{@link GridCacheStore}
-     * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method.
+     * If write-through is enabled, the stored value will be persisted to 
{@link org.apache.ignite.cache.store.CacheStore}
+     * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, 
Object, Object)} method.
      * <h2 class="header">Transactions</h2>
      * This method is transactional and will enlist the entry into ongoing 
transaction
      * if there is one.
@@ -164,8 +163,8 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
      * are acquired in undefined order, so it may cause a deadlock when used 
with
      * other concurrent transactional updates.
      * <p>
-     * If write-through is enabled, the values will be removed from {@link 
GridCacheStore}
-     * via {@link GridCacheStore#removeAll(IgniteTx, java.util.Collection)} 
method.
+     * If write-through is enabled, the values will be removed from {@link 
org.apache.ignite.cache.store.CacheStore}
+     * via {@link org.apache.ignite.cache.store.CacheStore#removeAll(IgniteTx, 
java.util.Collection)} method.
      * <h2 class="header">Transactions</h2>
      * This method is transactional and will enlist the entry into ongoing 
transaction
      * if there is one.
@@ -319,13 +318,13 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
      * the value will be loaded from the primary node, which in its turn may 
load the value
      * from the swap storage, and consecutively, if it's not in swap,
      * from the underlying persistent storage. If value has to be loaded from 
persistent
-     * storage,  {@link GridCacheStore#load(IgniteTx, Object)} method will be 
used.
+     * storage,  {@link 
org.apache.ignite.cache.store.CacheStore#load(IgniteTx, Object)} method will be 
used.
      * <p>
      * If the returned value is not needed, method {@link #putIf(Object, 
Object, IgnitePredicate)} should
      * always be used instead of this one to avoid the overhead associated 
with returning of the previous value.
      * <p>
-     * If write-through is enabled, the stored value will be persisted to 
{@link GridCacheStore}
-     * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method.
+     * If write-through is enabled, the stored value will be persisted to 
{@link org.apache.ignite.cache.store.CacheStore}
+     * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, 
Object, Object)} method.
      * <h2 class="header">Transactions</h2>
      * This method is transactional and will enlist the entry into ongoing 
transaction
      * if there is one.
@@ -356,8 +355,8 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
      * value and, therefore, does not have any overhead associated with 
returning a value. It
      * should be used whenever return value is not required.
      * <p>
-     * If write-through is enabled, the stored value will be persisted to 
{@link GridCacheStore}
-     * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method.
+     * If write-through is enabled, the stored value will be persisted to 
{@link org.apache.ignite.cache.store.CacheStore}
+     * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, 
Object, Object)} method.
      * <h2 class="header">Transactions</h2>
      * This method is transactional and will enlist the entry into ongoing 
transaction
      * if there is one.
@@ -384,14 +383,14 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
      * caches, the value will be loaded from the primary node, which in its 
turn may load the value
      * from the disk-based swap storage, and consecutively, if it's not in 
swap,
      * from the underlying persistent storage. If value has to be loaded from 
persistent
-     * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be 
used.
+     * storage, {@link org.apache.ignite.cache.store.CacheStore#load(IgniteTx, 
Object)} method will be used.
      * <p>
      * If the returned value is not needed, method {@link #removeIf(Object, 
IgnitePredicate)} should
      * always be used instead of this one to avoid the overhead associated 
with returning of the
      * previous value.
      * <p>
-     * If write-through is enabled, the value will be removed from {@link 
GridCacheStore}
-     * via {@link GridCacheStore#remove(IgniteTx, Object)} method.
+     * If write-through is enabled, the value will be removed from {@link 
org.apache.ignite.cache.store.CacheStore}
+     * via {@link org.apache.ignite.cache.store.CacheStore#remove(IgniteTx, 
Object)} method.
      * <h2 class="header">Transactions</h2>
      * This method is transactional and will enlist the entry into ongoing 
transaction
      * if there is one.
@@ -416,8 +415,8 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
      * This method will return {@code true} if remove did occur, which means 
that all optionally
      * provided filters have passed and there was something to remove, {@code 
false} otherwise.
      * <p>
-     * If write-through is enabled, the value will be removed from {@link 
GridCacheStore}
-     * via {@link GridCacheStore#remove(IgniteTx, Object)} method.
+     * If write-through is enabled, the value will be removed from {@link 
org.apache.ignite.cache.store.CacheStore}
+     * via {@link org.apache.ignite.cache.store.CacheStore#remove(IgniteTx, 
Object)} method.
      * <h2 class="header">Transactions</h2>
      * This method is transactional and will enlist the entry into ongoing 
transaction
      * if there is one.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
new file mode 100644
index 0000000..0d7a85b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * This adepter designed to support stores with bulk loading from stream-like 
source.
+ * <p>
+ * This class processes input data in the following way:
+ * <ul>
+ *      <li>
+ *          Iterator of input record obtained from user-defined {@link 
#inputIterator(Object...)}.
+ *      </li>
+ *      <li>
+ *          Iterator continuously queried for input records and they are 
grouped into batches of {@link #batchSize}.
+ *      </li>
+ *      <li>
+ *          Batch is placed into processing queue and puled by one of {@link 
#threadsCnt} working threads.
+ *      </li>
+ *      <li>
+ *          Each record in batch is passed to user-defined {@link 
#parse(Object, Object...)} method
+ *          and result is stored into cache.
+ *      </li>
+ * </ul>
+ * <p>
+ * Two methods should be implemented by inheritants:
+ * <ul>
+ *      <li>
+ *          {@link #inputIterator(Object...)}. It should open underlying data 
source
+ *          and iterate all record available in it. Individual records could 
be in very raw form,
+ *          like text lines for CSV files.
+ *      </li>
+ *      <li>
+ *          {@link #parse(Object, Object...)}. This method should process 
input records
+ *          and transform them into key-value pairs for cache.
+ *      </li>
+ * </ul>
+ * <p>
+ *
+ * @param <K> Key type.
+ * @param <V> Value type.
+ * @param <I> Input type.
+ */
+public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements 
CacheStore<K, V> {
+    /**
+     * Default batch size (number of records read with {@link 
#inputIterator(Object...)}
+     * and then submitted to internal pool at a time).
+     */
+    public static final int DFLT_BATCH_SIZE = 100;
+
+    /** Default batch queue size (max batches count to limit memory usage). */
+    public static final int DFLT_BATCH_QUEUE_SIZE = 100;
+
+    /** Default number of working threads (equal to the number of available 
processors). */
+    public static final int DFLT_THREADS_COUNT = 
Runtime.getRuntime().availableProcessors();
+
+    /** Auto-injected logger. */
+    @IgniteLoggerResource
+    private IgniteLogger log;
+
+    /** Batch size. */
+    private int batchSize = DFLT_BATCH_SIZE;
+
+    /** Size of queue of batches to process. */
+    private int batchQueueSize = DFLT_BATCH_QUEUE_SIZE;
+
+    /** Number fo working threads. */
+    private int threadsCnt = DFLT_THREADS_COUNT;
+
+    /**
+     * Returns iterator of input records.
+     * <p>
+     * Note that returned iterator doesn't have to be thread-safe. Thus it 
could
+     * operate on raw streams, DB connections, etc. without additional 
synchronization.
+     *
+     * @param args Arguments passes into {@link 
GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} 
method.
+     * @return Iterator over input records.
+     * @throws CacheLoaderException If iterator can't be created with the 
given arguments.
+     */
+    protected abstract Iterator<I> inputIterator(@Nullable Object... args) 
throws CacheLoaderException;
+
+    /**
+     * This method should transform raw data records into valid key-value pairs
+     * to be stored into cache.
+     * <p>
+     * If {@code null} is returned then this record will be just skipped.
+     *
+     * @param rec A raw data record.
+     * @param args Arguments passed into {@link 
GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} 
method.
+     * @return Cache entry to be saved in cache or {@code null} if no entry 
could be produced from this record.
+     */
+    @Nullable protected abstract IgniteBiTuple<K, V> parse(I rec, @Nullable 
Object... args);
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(IgniteBiInClosure<K, V> c, @Nullable 
Object... args) {
+        ExecutorService exec = new ThreadPoolExecutor(
+            threadsCnt,
+            threadsCnt,
+            0L,
+            MILLISECONDS,
+            new ArrayBlockingQueue<Runnable>(batchQueueSize),
+            new BlockingRejectedExecutionHandler());
+
+        Iterator<I> iter = inputIterator(args);
+
+        Collection<I> buf = new ArrayList<>(batchSize);
+
+        try {
+            while (iter.hasNext()) {
+                if (Thread.currentThread().isInterrupted()) {
+                    U.warn(log, "Working thread was interrupted while loading 
data.");
+
+                    break;
+                }
+
+                buf.add(iter.next());
+
+                if (buf.size() == batchSize) {
+                    exec.submit(new Worker(c, buf, args));
+
+                    buf = new ArrayList<>(batchSize);
+                }
+            }
+
+            if (!buf.isEmpty())
+                exec.submit(new Worker(c, buf, args));
+        }
+        catch (RejectedExecutionException ignored) {
+            // Because of custom RejectedExecutionHandler.
+            assert false : "RejectedExecutionException was thrown while it 
shouldn't.";
+        }
+        finally {
+            exec.shutdown();
+
+            try {
+                exec.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
+            }
+            catch (InterruptedException ignored) {
+                U.warn(log, "Working thread was interrupted while waiting for 
put operations to complete.");
+
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Returns batch size.
+     *
+     * @return Batch size.
+     */
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Sets batch size.
+     *
+     * @param batchSize Batch size.
+     */
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Returns batch queue size.
+     *
+     * @return Batch queue size.
+     */
+    public int getBatchQueueSize() {
+        return batchQueueSize;
+    }
+
+    /**
+     * Sets batch queue size.
+     *
+     * @param batchQueueSize Batch queue size.
+     */
+    public void setBatchQueueSize(int batchQueueSize) {
+        this.batchQueueSize = batchQueueSize;
+    }
+
+    /**
+     * Returns number of worker threads.
+     *
+     * @return Number of worker threads.
+     */
+    public int getThreadsCount() {
+        return threadsCnt;
+    }
+
+    /**
+     * Sets number of worker threads.
+     *
+     * @param threadsCnt Number of worker threads.
+     */
+    public void setThreadsCount(int threadsCnt) {
+        this.threadsCnt = threadsCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public V load(K key) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
+        return Collections.emptyMap();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(Cache.Entry<? extends K, ? extends V> entry) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<Cache.Entry<? extends K, ? 
extends V>> entries) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void delete(Object key) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deleteAll(Collection<?> keys) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void txEnd(boolean commit) {
+        // No-op.
+    }
+
+    /**
+     * Worker.
+     */
+    private class Worker implements Runnable {
+        /** */
+        private final IgniteBiInClosure<K, V> c;
+
+        /** */
+        private final Collection<I> buf;
+
+        /** */
+        private final Object[] args;
+
+        /**
+         * @param c Closure for loaded entries.
+         * @param buf Set of input records to process.
+         * @param args Arguments passed into {@link 
GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} 
method.
+         */
+        Worker(IgniteBiInClosure<K, V> c, Collection<I> buf, Object[] args) {
+            this.c = c;
+            this.buf = buf;
+            this.args = args;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            for (I rec : buf) {
+                IgniteBiTuple<K, V> entry = parse(rec, args);
+
+                if (entry != null)
+                    c.apply(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * This handler blocks the caller thread until free space will be 
available in tasks queue.
+     * If the executor is shut down than it throws {@link 
RejectedExecutionException}.
+     * <p>
+     * It is save to apply this policy when:
+     * <ol>
+     *      <li>{@code shutdownNow} is not used on the pool.</li>
+     *      <li>{@code shutdown} is called from the thread where all 
submissions where performed.</li>
+     * </ol>
+     */
+    private class BlockingRejectedExecutionHandler implements 
RejectedExecutionHandler {
+        /** {@inheritDoc} */
+        @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor 
executor) {
+            try {
+                if (executor.isShutdown())
+                    throw new RejectedExecutionException();
+                else
+                    executor.getQueue().put(r);
+            }
+            catch (InterruptedException ignored) {
+                U.warn(log, "Working thread was interrupted while loading 
data.");
+
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java
new file mode 100644
index 0000000..6fdec5a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import java.lang.annotation.*;
+
+/**
+ * Annotation for local {@link CacheStore} implementation. "Local" here means 
that there is no global
+ * database behind the grid but each node has an independent one.
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface CacheLocalStore {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
new file mode 100644
index 0000000..4cdfe5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.IgnitePortables;
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.portables.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.cache.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.integration.*;
+import java.sql.*;
+import java.util.*;
+import java.util.Date;
+
+import static javax.cache.Cache.*;
+
+/**
+ * API for cache persistent storage for read-through and write-through 
behavior.
+ * Persistent store is configured via {@link GridCacheConfiguration#getStore()}
+ * configuration property. If not provided, values will be only kept in cache 
memory
+ * or swap storage without ever being persisted to a persistent storage.
+ * <p>
+ * {@link CacheStoreAdapter} provides default implementation for bulk 
operations,
+ * such as {@link #loadAll(Iterable)},
+ * {@link #writeAll(Collection)}, and {@link #deleteAll(Collection)}
+ * by sequentially calling corresponding {@link #load(Object)},
+ * {@link #write(Entry)}, and {@link #delete(Object)}
+ * operations. Use this adapter whenever such behaviour is acceptable. However
+ * in many cases it maybe more preferable to take advantage of database batch 
update
+ * functionality, and therefore default adapter implementation may not be the 
best option.
+ * <p>
+ * Provided implementations may be used for test purposes:
+ * <ul>
+ *     <li>{@gglink 
org.gridgain.grid.cache.store.hibernate.GridCacheHibernateBlobStore}</li>
+ *     <li>{@link CacheJdbcBlobStore}</li>
+ * </ul>
+ * <p>
+ * All transactional operations of this API are provided with ongoing {@link 
IgniteTx},
+ * if any. As transaction is {@link GridMetadataAware}, you can attach any 
metadata to
+ * it, e.g. to recognize if several operations belong to the same transaction 
or not.
+ * Here is an example of how attach a JDBC connection as transaction metadata:
+ * <pre name="code" class="java">
+ * Connection conn = tx.meta("some.name");
+ *
+ * if (conn == null) {
+ *     conn = ...; // Get JDBC connection.
+ *
+ *     // Store connection in transaction metadata, so it can be accessed
+ *     // for other operations on the same transaction.
+ *     tx.addMeta("some.name", conn);
+ * }
+ * </pre>
+ * <h1 class="header">Working With Portable Objects</h1>
+ * When portables are enabled for cache by setting {@link 
GridCacheConfiguration#isPortableEnabled()} to
+ * {@code true}), all portable keys and values are converted to instances of 
{@link PortableObject}.
+ * Therefore, all cache store methods will take parameters in portable format. 
To avoid class
+ * cast exceptions, store must have signature compatible with portables. E.g., 
if you use {@link Integer}
+ * as a key and {@code Value} class as a value (which will be converted to 
portable format), cache store
+ * signature should be the following:
+ * <pre name="code" class="java">
+ * public class PortableCacheStore implements GridCacheStore&lt;Integer, 
GridPortableObject&gt; {
+ *     public void put(@Nullable GridCacheTx tx, Integer key, 
GridPortableObject val) throws IgniteCheckedException {
+ *         ...
+ *     }
+ *
+ *     ...
+ * }
+ * </pre>
+ * This behavior can be overridden by setting {@link 
GridCacheConfiguration#setKeepPortableInStore(boolean)}
+ * flag value to {@code false}. In this case, GridGain will deserialize keys 
and values stored in portable
+ * format before they are passed to cache store, so that you can use the 
following cache store signature instead:
+ * <pre name="code" class="java">
+ * public class ObjectsCacheStore implements GridCacheStore&lt;Integer, 
Person&gt; {
+ *     public void put(@Nullable GridCacheTx tx, Integer key, Person val) 
throws GridException {
+ *         ...
+ *     }
+ *
+ *     ...
+ * }
+ * </pre>
+ * Note that while this can simplify store implementation in some cases, it 
will cause performance degradation
+ * due to additional serializations and deserializations of portable objects. 
You will also need to have key
+ * and value classes on all nodes since portables will be deserialized when 
store is invoked.
+ * <p>
+ * Note that only portable classes are converted to {@link PortableObject} 
format. Following
+ * types are stored in cache without changes and therefore should not affect 
cache store signature:
+ * <ul>
+ *     <li>All primitives (byte, int, ...) and there boxed versions (Byte, 
Integer, ...)</li>
+ *     <li>Arrays of primitives (byte[], int[], ...)</li>
+ *     <li>{@link String} and array of {@link String}s</li>
+ *     <li>{@link UUID} and array of {@link UUID}s</li>
+ *     <li>{@link Date} and array of {@link Date}s</li>
+ *     <li>{@link Timestamp} and array of {@link Timestamp}s</li>
+ *     <li>Enums and array of enums</li>
+ *     <li>
+ *         Maps, collections and array of objects (but objects inside
+ *         them will still be converted if they are portable)
+ *     </li>
+ * </ul>
+ *
+ * @see IgnitePortables
+ */
+public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> 
{
+    /**
+     * Loads all values from underlying persistent storage. Note that keys are 
not
+     * passed, so it is up to implementation to figure out what to load. This 
method
+     * is called whenever {@link 
GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)}
+     * method is invoked which is usually to preload the cache from persistent 
storage.
+     * <p>
+     * This method is optional, and cache implementation does not depend on 
this
+     * method to do anything. Default implementation of this method in
+     * {@link CacheStoreAdapter} does nothing.
+     * <p>
+     * For every loaded value method {@link 
org.apache.ignite.lang.IgniteBiInClosure#apply(Object, Object)}
+     * should be called on the passed in closure. The closure will then make 
sure
+     * that the loaded value is stored in cache.
+     *
+     * @param clo Closure for loaded values.
+     * @param args Arguments passes into
+     *      {@link 
GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} 
method.
+     * @throws CacheLoaderException If loading failed.
+     */
+    public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... 
args) throws CacheLoaderException;
+
+    /**
+     * Tells store to commit or rollback a transaction depending on the value 
of the {@code 'commit'}
+     * parameter.
+     *
+     * @param commit {@code True} if transaction should commit, {@code false} 
for rollback.
+     * @throws CacheWriterException If commit or rollback failed. Note that 
commit failure in some cases
+     *      may bring cache transaction into {@link IgniteTxState#UNKNOWN} 
which will
+     *      consequently cause all transacted entries to be invalidated.
+     */
+    public void txEnd(boolean commit) throws CacheWriterException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
new file mode 100644
index 0000000..4ec5cbf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import java.util.*;
+
+/**
+ * Cache storage convenience adapter. It provides default implementation for 
bulk operations, such
+ * as {@link #loadAll(Iterable)},
+ * {@link #writeAll(Collection)}, and {@link #deleteAll(Collection)}
+ * by sequentially calling corresponding {@link #load(Object)},
+ * {@link #write(Cache.Entry)}, and {@link #delete(Object)}
+ * operations. Use this adapter whenever such behaviour is acceptable. However 
in many cases
+ * it maybe more preferable to take advantage of database batch update 
functionality, and therefore
+ * default adapter implementation may not be the best option.
+ * <p>
+ * Note that method {@link 
#loadCache(org.apache.ignite.lang.IgniteBiInClosure, Object...)} has empty
+ * implementation because it is essentially up to the user to invoke it with
+ * specific arguments.
+ */
+public abstract class CacheStoreAdapter<K, V> implements CacheStore<K, V> {
+    /** */
+    @IgniteCacheSessionResource
+    private CacheStoreSession ses;
+
+    /**
+     * Default empty implementation. This method needs to be overridden only if
+     * {@link GridCache#loadCache(IgniteBiPredicate, long, Object...)} method
+     * is explicitly called.
+     *
+     * @param clo {@inheritDoc}
+     * @param args {@inheritDoc}
+     */
+    @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object... 
args) {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
+        assert keys != null;
+
+        Map<K, V> loaded = new HashMap<>();
+
+        for (K key : keys) {
+            V v = load(key);
+
+            if (v != null)
+                loaded.put(key, v);
+        }
+
+        return loaded;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<Cache.Entry<? extends K, ? 
extends V>> entries) {
+        assert entries != null;
+
+        for (Cache.Entry<? extends K, ? extends V> e : entries)
+            write(e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deleteAll(Collection<?> keys) {
+        assert keys != null;
+
+        for (Object key : keys)
+            delete(key);
+    }
+
+    /**
+     * Default empty implementation for ending transactions. Note that if 
explicit cache
+     * transactions are not used, then transactions do not have to be 
explicitly ended -
+     * for all other cases this method should be overridden with custom 
commit/rollback logic.
+     *
+     * @param commit {@inheritDoc}
+     */
+    @Override public void txEnd(boolean commit) {
+        // No-op.
+    }
+
+    /**
+     * @return Current session.
+     */
+    @Nullable protected CacheStoreSession session() {
+        return ses;
+    }
+
+    /**
+     * @return Current transaction.
+     */
+    @Nullable protected IgniteTx transaction() {
+        return ses != null ? ses.transaction() : null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java
new file mode 100644
index 0000000..c76d4ec
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.typedef.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Cache store wrapper that ensures that there will be no more that one thread 
loading value from underlying store.
+ */
+public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> {
+    /** */
+    public static final int DFLT_LOAD_ALL_THRESHOLD = 5;
+
+    /** Delegate store. */
+    private CacheStore<K, V> delegate;
+
+    /** Pending cache store loads. */
+    private ConcurrentMap<K, LoadFuture> pendingLoads = new 
ConcurrentHashMap8<>();
+
+    /** Load all threshold. */
+    private int loadAllThreshold = DFLT_LOAD_ALL_THRESHOLD;
+
+    /**
+     * @param delegate Delegate store.
+     */
+    public CacheStoreBalancingWrapper(CacheStore<K, V> delegate) {
+        this.delegate = delegate;
+    }
+
+    /**
+     * @param delegate Delegate store.
+     * @param loadAllThreshold Load all threshold.
+     */
+    public CacheStoreBalancingWrapper(CacheStore<K, V> delegate, int 
loadAllThreshold) {
+        this.delegate = delegate;
+        this.loadAllThreshold = loadAllThreshold;
+    }
+
+    /**
+     * @return Load all threshold.
+     */
+    public int loadAllThreshold() {
+        return loadAllThreshold;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public V load(K key) {
+        LoadFuture fut = pendingLoads.get(key);
+
+        try {
+            if (fut != null)
+                return fut.get(key);
+
+            fut = new LoadFuture();
+
+            LoadFuture old = pendingLoads.putIfAbsent(key, fut);
+
+            if (old != null)
+                return old.get(key);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheLoaderException(e);
+        }
+
+        try {
+            V val = delegate.load(key);
+
+            fut.onComplete(key, val);
+
+            return val;
+        }
+        catch (Throwable e) {
+            fut.onError(key, e);
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable 
Object... args) {
+        delegate.loadCache(clo, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws 
CacheLoaderException {
+        return delegate.loadAll(keys);
+    }
+
+    /**
+     * @param keys Keys to load.
+     * @param c Closure for loaded values.
+     */
+    public void loadAll(Collection<? extends K> keys, final 
IgniteBiInClosure<K, V> c) {
+        assert keys.size() < loadAllThreshold;
+
+        Collection<K> needLoad = null;
+        Map<K, LoadFuture> pending = null;
+        LoadFuture span = null;
+
+        for (K key : keys) {
+            LoadFuture fut = pendingLoads.get(key);
+
+            if (fut != null) {
+                if (pending == null)
+                    pending = new HashMap<>();
+
+                pending.put(key, fut);
+            }
+            else {
+                // Try to concurrently add pending future.
+                if (span == null)
+                    span = new LoadFuture();
+
+                LoadFuture old = pendingLoads.putIfAbsent(key, span);
+
+                if (old != null) {
+                    if (pending == null)
+                        pending = new HashMap<>();
+
+                    pending.put(key, old);
+                }
+                else {
+                    if (needLoad == null)
+                        needLoad = new ArrayList<>(keys.size());
+
+                    needLoad.add(key);
+                }
+            }
+        }
+
+        if (needLoad != null) {
+            assert !needLoad.isEmpty();
+            assert span != null;
+
+            try {
+                Map<K, V> loaded = delegate.loadAll(needLoad);
+
+                for (Map.Entry<K, V> e : loaded.entrySet())
+                    c.apply(e.getKey(), e.getValue());
+
+                span.onComplete(needLoad, loaded);
+            }
+            catch (Throwable e) {
+                span.onError(needLoad, e);
+
+                throw e;
+            }
+        }
+
+        if (pending != null) {
+            try {
+                for (Map.Entry<K, LoadFuture> e : pending.entrySet()) {
+                    K key = e.getKey();
+
+                    c.apply(key, e.getValue().get(key));
+                }
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheLoaderException(e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(Cache.Entry<? extends K, ? extends V> entry) {
+        delegate.write(entry);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<Cache.Entry<? extends K, ? 
extends V>> entries) {
+        delegate.writeAll(entries);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void delete(Object key) throws CacheWriterException {
+        delegate.delete(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deleteAll(Collection<?> keys) throws 
CacheWriterException {
+        delegate.deleteAll(keys);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void txEnd(boolean commit) {
+        delegate.txEnd(commit);
+    }
+
+    /**
+     *
+     */
+    private class LoadFuture extends GridFutureAdapter<Map<K, V>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Collection of keys for pending cleanup. */
+        private volatile Collection<K> keys;
+
+        /**
+         *
+         */
+        public LoadFuture() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Map<K, V> res, @Nullable 
Throwable err) {
+            if (super.onDone(res, err)) {
+                assert keys != null;
+
+                for (K key : keys)
+                    pendingLoads.remove(key, this);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @param key Key.
+         * @param val Loaded value.
+         */
+        public void onComplete(K key, V val) {
+            onComplete(Collections.singletonList(key), F.asMap(key, val));
+        }
+
+        /**
+         * @param keys Keys.
+         * @param res Loaded values.
+         */
+        public void onComplete(Collection<K> keys, Map<K, V> res) {
+            this.keys = keys;
+
+            onDone(res);
+        }
+
+        /**
+         * @param key Key.
+         * @param err Error.
+         */
+        public void onError(K key, Throwable err) {
+
+        }
+
+        /**
+         * @param keys Keys.
+         * @param err Error.
+         */
+        public void onError(Collection<K> keys, Throwable err) {
+            this.keys = keys;
+
+            onDone(err);
+        }
+
+        /**
+         * Gets value loaded for key k.
+         *
+         * @param key Key to load.
+         * @return Loaded value (possibly {@code null}).
+         * @throws IgniteCheckedException If load failed.
+         */
+        public V get(K key) throws IgniteCheckedException {
+            return get().get(key);
+        }
+    }
+}

Reply via email to