Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-5 541b1e0f6 -> 2f615220d


IGNITE-891 - Cache store improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b97441f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b97441f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b97441f9

Branch: refs/heads/ignite-sprint-5
Commit: b97441f994bb34b23c54848f6520f506b4094590
Parents: 896b426
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Thu May 14 19:12:43 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Thu May 14 19:12:43 2015 -0700

----------------------------------------------------------------------
 .../store/jdbc/CacheJdbcPersonStore.java        | 137 +++----------
 .../store/jdbc/CacheJdbcStoreExample.java       |  14 ++
 .../cache/store/CacheStoreSessionListener.java  |  27 +++
 .../jdbc/CacheStoreSessionJdbcListener.java     | 100 +++++++++
 .../configuration/CacheConfiguration.java       |  26 +++
 .../configuration/IgniteConfiguration.java      |  31 ++-
 .../cache/GridCacheSharedContext.java           |   4 -
 .../cache/store/CacheOsStoreManager.java        |   1 -
 .../store/GridCacheStoreManagerAdapter.java     | 134 +++++++++---
 .../cache/transactions/IgniteTxAdapter.java     |  24 ++-
 .../transactions/IgniteTxLocalAdapter.java      |  57 ++++--
 ...cheStoreSessionListenerAbstractSelfTest.java | 204 +++++++++++++++++++
 .../CacheStoreSessionJdbcListenerSelfTest.java  | 145 +++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   3 +
 .../CacheStoreSessionHibernateListener.java     | 106 ++++++++++
 ...heStoreSessionHibernateListenerSelfTest.java | 150 ++++++++++++++
 modules/spring/pom.xml                          |  14 ++
 .../spring/CacheStoreSessionSpringListener.java | 125 ++++++++++++
 ...CacheStoreSessionSpringListenerSelfTest.java | 179 ++++++++++++++++
 .../testsuites/IgniteSpringTestSuite.java       |   3 +
 20 files changed, 1307 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
index 791f861..856512b 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
@@ -19,26 +19,21 @@ package org.apache.ignite.examples.datagrid.store.jdbc;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.*;
 import org.apache.ignite.examples.datagrid.store.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
-import org.jetbrains.annotations.*;
 
 import javax.cache.*;
 import javax.cache.integration.*;
 import java.sql.*;
-import java.util.*;
 
 /**
  * Example of {@link CacheStore} implementation that uses JDBC
  * transaction with cache transactions and maps {@link Long} to {@link Person}.
- *
  */
 public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
-    /** Transaction metadata attribute name. */
-    private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION";
-
-    /** Auto-injected store session. */
+    /** Store session. */
     @CacheStoreSessionResource
     private CacheStoreSession ses;
 
@@ -58,11 +53,12 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
      * @throws IgniteException If failed.
      */
     private void prepareDb() throws IgniteException {
-        try (Connection conn = openConnection(false); Statement st = 
conn.createStatement()) {
+        try (
+            Connection conn = 
DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
+            Statement st = conn.createStatement()
+        ) {
             st.execute("create table if not exists PERSONS (id number unique, 
firstName varchar(255), " +
                 "lastName varchar(255))");
-
-            conn.commit();
         }
         catch (SQLException e) {
             throw new IgniteException("Failed to create database table.", e);
@@ -70,32 +66,11 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
     }
 
     /** {@inheritDoc} */
-    @Override public void sessionEnd(boolean commit) {
-        Map<String, Connection> props = ses.properties();
-
-        try (Connection conn = props.remove(ATTR_NAME)) {
-            if (conn != null) {
-                if (commit)
-                    conn.commit();
-                else
-                    conn.rollback();
-            }
-
-            System.out.println(">>> Transaction ended [commit=" + commit + 
']');
-        }
-        catch (SQLException e) {
-            throw new CacheWriterException("Failed to end transaction: " + 
ses.transaction(), e);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public Person load(Long key) {
         System.out.println(">>> Loading key: " + key);
 
-        Connection conn = null;
-
         try {
-            conn = connection();
+            Connection conn = connection();
 
             try (PreparedStatement st = conn.prepareStatement("select * from 
PERSONS where id=?")) {
                 st.setString(1, key.toString());
@@ -109,9 +84,6 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
         catch (SQLException e) {
             throw new CacheLoaderException("Failed to load object: " + key, e);
         }
-        finally {
-            end(conn);
-        }
 
         return null;
     }
@@ -124,10 +96,8 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
 
         System.out.println(">>> Putting [key=" + key + ", val=" + val +  ']');
 
-        Connection conn = null;
-
         try {
-            conn = connection();
+            Connection conn = connection();
 
             int updated;
 
@@ -157,19 +127,14 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
         catch (SQLException e) {
             throw new CacheLoaderException("Failed to put object [key=" + key 
+ ", val=" + val + ']', e);
         }
-        finally {
-            end(conn);
-        }
     }
 
     /** {@inheritDoc} */
     @Override public void delete(Object key) {
         System.out.println(">>> Removing key: " + key);
 
-        Connection conn = null;
-
         try {
-            conn = connection();
+            Connection conn = connection();
 
             try (PreparedStatement st = conn.prepareStatement("delete from 
PERSONS where id=?")) {
                 st.setLong(1, (Long)key);
@@ -180,9 +145,6 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
         catch (SQLException e) {
             throw new CacheWriterException("Failed to remove object: " + key, 
e);
         }
-        finally {
-            end(conn);
-        }
     }
 
     /** {@inheritDoc} */
@@ -192,22 +154,23 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
 
         final int entryCnt = (Integer)args[0];
 
-        try (Connection conn = connection()) {
-            try (PreparedStatement st = conn.prepareStatement("select * from 
PERSONS")) {
-                try (ResultSet rs = st.executeQuery()) {
-                    int cnt = 0;
+        Connection conn = connection();
 
-                    while (cnt < entryCnt && rs.next()) {
-                        Person person = new Person(rs.getLong(1), 
rs.getString(2), rs.getString(3));
+        try (
+            PreparedStatement st = conn.prepareStatement("select * from 
PERSONS");
+            ResultSet rs = st.executeQuery()
+        ) {
+            int cnt = 0;
 
-                        clo.apply(person.getId(), person);
+            while (cnt < entryCnt && rs.next()) {
+                Person person = new Person(rs.getLong(1), rs.getString(2), 
rs.getString(3));
 
-                        cnt++;
-                    }
+                clo.apply(person.getId(), person);
 
-                    System.out.println(">>> Loaded " + cnt + " values into 
cache.");
-                }
+                cnt++;
             }
+
+            System.out.println(">>> Loaded " + cnt + " values into cache.");
         }
         catch (SQLException e) {
             throw new CacheLoaderException("Failed to load values from cache 
store.", e);
@@ -215,61 +178,11 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
     }
 
     /**
-     * @return Connection.
-     * @throws SQLException In case of error.
-     */
-    private Connection connection() throws SQLException  {
-        // If there is an ongoing transaction,
-        // we must reuse the same connection.
-        if (ses.isWithinTransaction()) {
-            Map<Object, Object> props = ses.properties();
-
-            Connection conn = (Connection)props.get(ATTR_NAME);
-
-            if (conn == null) {
-                conn = openConnection(false);
-
-                // Store connection in session properties, so it can be 
accessed
-                // for other operations on the same transaction.
-                props.put(ATTR_NAME, conn);
-            }
-
-            return conn;
-        }
-        // Transaction can be null in case of simple load or put operation.
-        else
-            return openConnection(true);
-    }
-
-    /**
-     * Closes allocated resources depending on transaction status.
-     *
-     * @param conn Allocated connection.
-     */
-    private void end(@Nullable Connection conn) {
-        if (!ses.isWithinTransaction() && conn != null) {
-            // Close connection right away if there is no transaction.
-            try {
-                conn.close();
-            }
-            catch (SQLException ignored) {
-                // No-op.
-            }
-        }
-    }
-
-    /**
-     * Gets connection from a pool.
+     * Gets JDBC connection attached to current session.
      *
-     * @param autocommit {@code true} If connection should use autocommit mode.
-     * @return Pooled connection.
-     * @throws SQLException In case of error.
+     * @return Connection.
      */
-    private Connection openConnection(boolean autocommit) throws SQLException {
-        Connection conn = 
DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
-
-        conn.setAutoCommit(autocommit);
-
-        return conn;
+    private Connection connection() {
+        return ses.<String, 
Connection>properties().get(CacheStoreSessionJdbcListener.JDBC_CONN_KEY);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
index 1cb73c9..82e1079 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java
@@ -18,10 +18,13 @@
 package org.apache.ignite.examples.datagrid.store.jdbc;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.examples.*;
 import org.apache.ignite.examples.datagrid.store.*;
 import org.apache.ignite.transactions.*;
+import org.h2.jdbcx.*;
 
 import javax.cache.configuration.*;
 import java.util.*;
@@ -71,6 +74,17 @@ public class CacheJdbcStoreExample {
             // Configure JDBC store.
             
cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(CacheJdbcPersonStore.class));
 
+            // Configure JDBC session listener.
+            cacheCfg.setCacheStoreSessionListenerFactories(new 
Factory<CacheStoreSessionListener>() {
+                @Override public CacheStoreSessionListener create() {
+                    CacheStoreSessionJdbcListener lsnr = new 
CacheStoreSessionJdbcListener();
+
+                    
lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1",
 "", ""));
+
+                    return lsnr;
+                }
+            });
+
             cacheCfg.setReadThrough(true);
             cacheCfg.setWriteThrough(true);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
new file mode 100644
index 0000000..e57714b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+/**
+ * TODO
+ */
+public interface CacheStoreSessionListener {
+    public void onSessionStart(CacheStoreSession ses);
+
+    public void onSessionEnd(CacheStoreSession ses, boolean commit);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
new file mode 100644
index 0000000..9622063
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * TODO
+ */
+public class CacheStoreSessionJdbcListener implements 
CacheStoreSessionListener {
+    /** Session key for JDBC connection. */
+    public static final String JDBC_CONN_KEY = "__jdbc_conn_";
+
+    /** Data source. */
+    private DataSource dataSrc;
+
+    /** Store session. */
+    @CacheStoreSessionResource
+    private CacheStoreSession ses;
+
+    /**
+     * Sets data source.
+     *
+     * @param dataSrc Data source.
+     */
+    public void setDataSource(DataSource dataSrc) {
+        A.notNull(dataSrc, "dataSrc");
+
+        this.dataSrc = dataSrc;
+    }
+
+    /**
+     * Gets data source.
+     *
+     * @return Data source.
+     */
+    public DataSource getDataSource() {
+        return dataSrc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionStart(CacheStoreSession ses) {
+        Map<String, Connection> props = ses.properties();
+
+        if (!props.containsKey(JDBC_CONN_KEY)) {
+            try {
+                Connection conn = dataSrc.getConnection();
+
+                conn.setAutoCommit(false);
+
+                props.put(JDBC_CONN_KEY, conn);
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException("Failed to start store session 
[tx=" + ses.transaction() + ']', e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+        Connection conn = ses.<String, 
Connection>properties().remove(JDBC_CONN_KEY);
+
+        if (conn != null) {
+            try {
+                if (commit)
+                    conn.commit();
+                else
+                    conn.rollback();
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException("Failed to start store session 
[tx=" + ses.transaction() + ']', e);
+            }
+            finally {
+                U.closeQuiet(conn);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index df6b2ee..33a5711c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -316,6 +316,9 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
     /** Cache topology validator. */
     private TopologyValidator topValidator;
 
+    /** Cache store session listeners. */
+    private Factory<? extends CacheStoreSessionListener>[] storeSesLsnrs;
+
     /** Empty constructor (all values are initialized to their defaults). */
     public CacheConfiguration() {
         /* No-op. */
@@ -389,6 +392,7 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
         sqlOnheapRowCacheSize = cc.getSqlOnheapRowCacheSize();
         startSize = cc.getStartSize();
         storeFactory = cc.getCacheStoreFactory();
+        storeSesLsnrs = cc.getCacheStoreSessionListenerFactories();
         swapEnabled = cc.isSwapEnabled();
         tmLookupClsName = cc.getTransactionManagerLookupClassName();
         topValidator = cc.getTopologyValidator();
@@ -1734,6 +1738,28 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
         return this;
     }
 
+    /**
+     * Gets cache store session listener factories.
+     *
+     * @return Cache store session listener factories.
+     */
+    public Factory<? extends CacheStoreSessionListener>[] 
getCacheStoreSessionListenerFactories() {
+        return storeSesLsnrs;
+    }
+
+    /**
+     * Cache store session listener factories.
+     *
+     * @param storeSesLsnrs Cache store session listener factories.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration setCacheStoreSessionListenerFactories(
+        Factory<? extends CacheStoreSessionListener>... storeSesLsnrs) {
+        this.storeSesLsnrs = storeSesLsnrs;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index ebe2b8e..96ac7e3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.configuration;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.events.*;
@@ -52,6 +53,7 @@ import org.apache.ignite.spi.loadbalancing.roundrobin.*;
 import org.apache.ignite.spi.swapspace.*;
 import org.apache.ignite.spi.swapspace.file.*;
 
+import javax.cache.configuration.*;
 import javax.cache.event.*;
 import javax.cache.expiry.*;
 import javax.cache.integration.*;
@@ -334,9 +336,6 @@ public class IgniteConfiguration {
     /** Cache configurations. */
     private CacheConfiguration[] cacheCfg;
 
-    /** Client cache configurations. */
-    private NearCacheConfiguration[] nearCacheCfg;
-
     /** Client mode flag. */
     private Boolean clientMode;
 
@@ -398,6 +397,9 @@ public class IgniteConfiguration {
     /** User's class loader. */
     private ClassLoader classLdr;
 
+    /** Cache store session listeners. */
+    private Factory<CacheStoreSessionListener>[] storeSesLsnrs;
+
     /**
      * Creates valid grid configuration with all default values.
      */
@@ -478,6 +480,7 @@ public class IgniteConfiguration {
         segResolvers = cfg.getSegmentationResolvers();
         sndRetryCnt = cfg.getNetworkSendRetryCount();
         sndRetryDelay = cfg.getNetworkSendRetryDelay();
+        storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories();
         svcCfgs = cfg.getServiceConfiguration();
         sysPoolSize = cfg.getSystemThreadPoolSize();
         timeSrvPortBase = cfg.getTimeServerPortBase();
@@ -2242,6 +2245,28 @@ public class IgniteConfiguration {
         return classLdr;
     }
 
+    /**
+     * Gets cache store session listener factories.
+     *
+     * @return Cache store session listener factories.
+     */
+    public Factory<CacheStoreSessionListener>[] 
getCacheStoreSessionListenerFactories() {
+        return storeSesLsnrs;
+    }
+
+    /**
+     * Cache store session listener factories.
+     *
+     * @param storeSesLsnrs Cache store session listener factories.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setCacheStoreSessionListenerFactories(
+        Factory<CacheStoreSessionListener>... storeSesLsnrs) {
+        this.storeSesLsnrs = storeSesLsnrs;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 294c2b0..dacd1aa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -441,10 +441,6 @@ public class GridCacheSharedContext<K, V> {
                 if (activeCacheCtx.cacheId() != cacheCtx.cacheId())
                     return false;
             }
-
-            // Check that caches have the same store.
-            if (activeCacheCtx.store().store() != cacheCtx.store().store())
-                return false;
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
index 5fde622..02fe679 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.store;
 
-import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index f9a966c..a9ea2c0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -35,6 +35,7 @@ import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
+import javax.cache.configuration.*;
 import javax.cache.integration.*;
 import java.util.*;
 
@@ -59,11 +60,17 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
     private ThreadLocal<SessionData> sesHolder;
 
     /** */
+    private ThreadLocalSession locSes;
+
+    /** */
     private boolean locStore;
 
     /** */
     private boolean writeThrough;
 
+    /** */
+    private CacheStoreSessionListener[] sesLsnrs;
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void initialize(@Nullable CacheStore cfgStore, Map 
sesHolders) throws IgniteCheckedException {
@@ -84,19 +91,43 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
             sesHolder0 = ((Map<CacheStore, 
ThreadLocal>)sesHolders).get(cfgStore);
 
             if (sesHolder0 == null) {
-                ThreadLocalSession locSes = new ThreadLocalSession();
+                sesHolder0 = new ThreadLocal<>();
 
-                if (ctx.resource().injectStoreSession(cfgStore, locSes)) {
-                    sesHolder0 = locSes.sesHolder;
+                locSes = new ThreadLocalSession(sesHolder0);
 
+                if (ctx.resource().injectStoreSession(cfgStore, locSes))
                     sesHolders.put(cfgStore, sesHolder0);
-                }
             }
+            else
+                locSes = new ThreadLocalSession(sesHolder0);
         }
 
         sesHolder = sesHolder0;
 
         locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class);
+
+        sesLsnrs = 
createSessionListeners(cfg.getCacheStoreSessionListenerFactories());
+
+        if (sesLsnrs == null)
+            sesLsnrs = 
createSessionListeners(ctx.config().getCacheStoreSessionListenerFactories());
+    }
+
+    /**
+     * Creates session listeners.
+     *
+     * @param factories Factories.
+     * @return Listeners.
+     */
+    private CacheStoreSessionListener[] 
createSessionListeners(Factory<CacheStoreSessionListener>[] factories) {
+        if (factories == null)
+            return null;
+
+        CacheStoreSessionListener[] lsnrs = new 
CacheStoreSessionListener[factories.length];
+
+        for (int i = 0; i < factories.length; i++)
+            lsnrs[i] = factories[i].create();
+
+        return lsnrs;
     }
 
     /** {@inheritDoc} */
@@ -215,14 +246,14 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             Object val = null;
 
             try {
                 val = singleThreadGate.load(storeKey);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -234,7 +265,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -349,7 +380,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 IgniteBiInClosure<Object, Object> c = new CI2<Object, 
Object>() {
@@ -380,7 +411,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 else
                     singleThreadGate.loadAll(keys0, c);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -392,7 +423,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -408,7 +439,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
 
             sessionInit0(null);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.loadCache(new IgniteBiInClosure<Object, Object>() {
@@ -431,7 +462,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                     }
                 }, args);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (CacheLoaderException e) {
                 throw new IgniteCheckedException(e);
@@ -440,7 +471,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(null, thewEx);
+                sessionEnd0(null, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -473,12 +504,12 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) 
: val));
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -490,7 +521,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheWriterException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -522,12 +553,12 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
                 sessionInit0(tx);
 
-                boolean thewEx = true;
+                boolean threwEx = true;
 
                 try {
                     store.writeAll(entries);
 
-                    thewEx = false;
+                    threwEx = false;
                 }
                 catch (ClassCastException e) {
                     handleClassCastException(e);
@@ -548,7 +579,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                     throw new IgniteCheckedException(e);
                 }
                 finally {
-                    sessionEnd0(tx, thewEx);
+                    sessionEnd0(tx, threwEx);
                 }
 
                 if (log.isDebugEnabled())
@@ -576,12 +607,12 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.delete(key);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -593,7 +624,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheWriterException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -625,12 +656,12 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.deleteAll(keys0);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -645,7 +676,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(e);
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -675,6 +706,11 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
         sessionInit0(tx);
 
         try {
+            if (sesLsnrs != null) {
+                for (CacheStoreSessionListener lsnr : sesLsnrs)
+                    lsnr.onSessionEnd(locSes, commit);
+            }
+
             store.sessionEnd(commit);
         }
         finally {
@@ -715,9 +751,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
      * @param tx Current transaction.
      */
     private void sessionInit0(@Nullable IgniteInternalTx tx) {
-        if (sesHolder == null)
-            return;
-
+        assert sesHolder != null;
         assert sesHolder.get() == null;
 
         SessionData ses;
@@ -738,6 +772,15 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
             ses = new SessionData(null, cctx.name());
 
         sesHolder.set(ses);
+
+        if (!ses.started()) {
+            if (sesLsnrs != null) {
+                for (CacheStoreSessionListener lsnr : sesLsnrs)
+                    lsnr.onSessionStart(locSes);
+            }
+
+            ses.onStarted();
+        }
     }
 
     /**
@@ -745,8 +788,14 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
      */
     private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) 
throws IgniteCheckedException {
         try {
-            if (tx == null)
-                store.sessionEnd(threwEx);
+            if (tx == null) {
+                if (sesLsnrs != null) {
+                    for (CacheStoreSessionListener lsnr : sesLsnrs)
+                        lsnr.onSessionEnd(locSes, !threwEx);
+                }
+
+                store.sessionEnd(!threwEx);
+            }
         }
         catch (Exception e) {
             if (!threwEx)
@@ -788,6 +837,9 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
         @GridToStringInclude
         private Map<Object, Object> props;
 
+        /** */
+        private boolean started;
+
         /**
          * @param tx Current transaction.
          * @param cacheName Cache name.
@@ -828,6 +880,19 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
             this.cacheName = cacheName;
         }
 
+        /**
+         */
+        private void onStarted() {
+            started = true;
+        }
+
+        /**
+         * @return If session is started.
+         */
+        private boolean started() {
+            return started;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(SessionData.class, this, "tx", CU.txString(tx));
@@ -839,7 +904,14 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
      */
     private static class ThreadLocalSession implements CacheStoreSession {
         /** */
-        private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>();
+        private final ThreadLocal<SessionData> sesHolder;
+
+        /**
+         * @param sesHolder Session holder.
+         */
+        private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) {
+            this.sesHolder = sesHolder;
+        }
 
         /** {@inheritDoc} */
         @Nullable @Override public Transaction transaction() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 64cc77f..f513b59 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -437,7 +437,12 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
 
     /** {@inheritDoc} */
     @Override public boolean storeUsed() {
-        return storeEnabled() && store() != null;
+        if (!storeEnabled())
+            return false;
+
+        Collection<CacheStoreManager> stores = stores();
+
+        return stores != null && !stores.isEmpty();
     }
 
     /**
@@ -445,13 +450,20 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
      *
      * @return Store manager.
      */
-    protected CacheStoreManager store() {
-        if (!activeCacheIds().isEmpty()) {
-            int cacheId = F.first(activeCacheIds());
+    protected Collection<CacheStoreManager> stores() {
+        Collection<Integer> cacheIds = activeCacheIds();
 
-            CacheStoreManager store = cctx.cacheContext(cacheId).store();
+        if (!cacheIds.isEmpty()) {
+            Collection<CacheStoreManager> stores = new 
ArrayList<>(cacheIds.size());
+
+            for (int cacheId : cacheIds) {
+                CacheStoreManager store = cctx.cacheContext(cacheId).store();
+
+                if (store.configured())
+                    stores.add(store);
+            }
 
-            return store.configured() ? store : null;
+            return stores;
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7e9095c..40bb36e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -496,17 +496,24 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
      */
     @SuppressWarnings({"CatchGenericClass"})
     protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) 
throws IgniteCheckedException {
-        CacheStoreManager store = store();
+        if (!storeEnabled() || (internal() && !groupLock()))
+            return;
 
-        if (store != null && store.isWriteThrough() && storeEnabled() &&
-            (!internal() || groupLock()) && (near() || 
store.isWriteToStoreFromDht())) {
+        Collection<CacheStoreManager> stores = stores();
+
+        if (stores == null || stores.isEmpty())
+            return;
+
+        boolean isWriteToStoreFromDht = 
F.first(stores).isWriteToStoreFromDht();
+
+        if (near() || isWriteToStoreFromDht) {
             try {
                 if (writeEntries != null) {
                     Map<Object, IgniteBiTuple<Object, GridCacheVersion>> 
putMap = null;
                     List<Object> rmvCol = null;
                     CacheStoreManager writeStore = null;
 
-                    boolean skipNear = near() && store.isWriteToStoreFromDht();
+                    boolean skipNear = near() && isWriteToStoreFromDht;
 
                     for (IgniteTxEntry e : writeEntries) {
                         if ((skipNear && e.cached().isNear()) || e.skipStore())
@@ -560,12 +567,15 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                                 val = 
cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal));
                             }
 
-                            if (putMap == null)
-                                putMap = new 
LinkedHashMap<>(writeMap().size(), 1.0f);
+                            if (writeStore == null)
+                                writeStore = cacheCtx.store();
 
-                            putMap.put(CU.value(key, cacheCtx, false), 
F.t(CU.value(val, cacheCtx, false), ver));
+                            if (writeStore.isWriteThrough()) {
+                                if (putMap == null)
+                                    putMap = new 
LinkedHashMap<>(writeMap().size(), 1.0f);
 
-                            writeStore = cacheCtx.store();
+                                putMap.put(CU.value(key, cacheCtx, false), 
F.t(CU.value(val, cacheCtx, false), ver));
+                            }
                         }
                         else if (op == DELETE) {
                             // Batch-process all puts if needed.
@@ -597,12 +607,15 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                                     continue;
                             }
 
-                            if (rmvCol == null)
-                                rmvCol = new ArrayList<>();
+                            if (writeStore == null)
+                                writeStore = cacheCtx.store();
 
-                            
rmvCol.add(key.value(cacheCtx.cacheObjectContext(), false));
+                            if (writeStore.isWriteThrough()) {
+                                if (rmvCol == null)
+                                    rmvCol = new ArrayList<>();
 
-                            writeStore = cacheCtx.store();
+                                
rmvCol.add(key.value(cacheCtx.cacheObjectContext(), false));
+                            }
                         }
                         else if (log.isDebugEnabled())
                             log.debug("Ignoring NOOP entry for batch store 
commit: " + e);
@@ -626,7 +639,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 }
 
                 // Commit while locks are held.
-                store.sessionEnd(this, true);
+                for (CacheStoreManager store : stores)
+                    store.sessionEnd(this, true);
             }
             catch (IgniteCheckedException ex) {
                 commitError(ex);
@@ -994,11 +1008,12 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
             }
         }
         else {
-            CacheStoreManager store = store();
+            Collection<CacheStoreManager> stores = stores();
 
-            if (store != null && (!internal() || groupLock())) {
+            if (stores != null && !stores.isEmpty() && (!internal() || 
groupLock())) {
                 try {
-                    store.sessionEnd(this, true);
+                    for (CacheStoreManager store : stores)
+                        store.sessionEnd(this, true);
                 }
                 catch (IgniteCheckedException e) {
                     commitError(e);
@@ -1099,11 +1114,13 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
 
                 cctx.tm().rollbackTx(this);
 
-                CacheStoreManager store = store();
+                Collection<CacheStoreManager> stores = stores();
 
-                if (store != null && (near() || 
store.isWriteToStoreFromDht())) {
-                    if (!internal() || groupLock())
-                        store.sessionEnd(this, false);
+                if (stores != null && !stores.isEmpty() && (near() || 
F.first(stores).isWriteToStoreFromDht())) {
+                    if (!internal() || groupLock()) {
+                        for (CacheStoreManager store : stores)
+                            store.sessionEnd(this, false);
+                    }
                 }
             }
             catch (Error | IgniteCheckedException | RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
new file mode 100644
index 0000000..5a01c2d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.configuration.*;
+import java.io.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Tests for store session listeners.
+ */
+public abstract class CacheStoreSessionListenerAbstractSelfTest extends 
GridCommonAbstractTest implements Serializable {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    protected static final AtomicInteger loadCacheCnt = new AtomicInteger();
+
+    /** */
+    protected static final AtomicInteger loadCnt = new AtomicInteger();
+
+    /** */
+    protected static final AtomicInteger writeCnt = new AtomicInteger();
+
+    /** */
+    protected static final AtomicInteger deleteCnt = new AtomicInteger();
+
+    /** */
+    protected static final AtomicInteger reuseCnt = new AtomicInteger();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(3);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        loadCacheCnt.set(0);
+        loadCnt.set(0);
+        writeCnt.set(0);
+        deleteCnt.set(0);
+        reuseCnt.set(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicCache() throws Exception {
+        CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, 
CacheAtomicityMode.ATOMIC);
+
+        try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) 
{
+            cache.loadCache(null);
+            cache.get(1);
+            cache.put(1, 1);
+            cache.remove(1);
+        }
+
+        assertEquals(3, loadCacheCnt.get());
+        assertEquals(1, loadCnt.get());
+        assertEquals(1, writeCnt.get());
+        assertEquals(1, deleteCnt.get());
+        assertEquals(0, reuseCnt.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransactionalCache() throws Exception {
+        CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, 
CacheAtomicityMode.TRANSACTIONAL);
+
+        try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) 
{
+            cache.loadCache(null);
+            cache.get(1);
+            cache.put(1, 1);
+            cache.remove(1);
+        }
+
+        assertEquals(3, loadCacheCnt.get());
+        assertEquals(1, loadCnt.get());
+        assertEquals(1, writeCnt.get());
+        assertEquals(1, deleteCnt.get());
+        assertEquals(0, reuseCnt.get());
+
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExplicitTransaction() throws Exception {
+        CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, 
CacheAtomicityMode.TRANSACTIONAL);
+
+        try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) 
{
+            try (Transaction tx = ignite(0).transactions().txStart()) {
+                cache.put(1, 1);
+                cache.put(2, 2);
+                cache.remove(3);
+                cache.remove(4);
+
+                tx.commit();
+            }
+        }
+
+        assertEquals(2, writeCnt.get());
+        assertEquals(2, deleteCnt.get());
+        assertEquals(3, reuseCnt.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCrossCacheTransaction() throws Exception {
+        CacheConfiguration<Integer, Integer> cfg1 = 
cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
+        CacheConfiguration<Integer, Integer> cfg2 = 
cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
+
+        try (
+            IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+            IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
+        ) {
+            try (Transaction tx = ignite(0).transactions().txStart()) {
+                cache1.put(1, 1);
+                cache2.put(2, 2);
+                cache1.remove(3);
+                cache2.remove(4);
+
+                tx.commit();
+            }
+        }
+
+        assertEquals(2, writeCnt.get());
+        assertEquals(2, deleteCnt.get());
+        assertEquals(3, reuseCnt.get());
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicity Atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(String 
name, CacheAtomicityMode atomicity) {
+        CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+
+        cfg.setName(name);
+        cfg.setAtomicityMode(atomicity);
+        cfg.setCacheStoreFactory(storeFactory());
+        cfg.setCacheStoreSessionListenerFactories(sessionListenerFactory());
+        cfg.setReadThrough(true);
+        cfg.setWriteThrough(true);
+        cfg.setLoadPreviousValue(true);
+
+        return cfg;
+    }
+
+    /**
+     * @return Store factory.
+     */
+    protected abstract Factory<? extends CacheStore<Integer, Integer>> 
storeFactory();
+
+    /**
+     * @return Session listener factory.
+     */
+    protected abstract Factory<CacheStoreSessionListener> 
sessionListenerFactory();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
new file mode 100644
index 0000000..9020e0d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.h2.jdbcx.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * Tests for {@link CacheStoreSessionJdbcListener}.
+ */
+public class CacheStoreSessionJdbcListenerSelfTest extends 
CacheStoreSessionListenerAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected Factory<? extends CacheStore<Integer, Integer>> 
storeFactory() {
+        return new Factory<CacheStore<Integer, Integer>>() {
+            @Override public CacheStore<Integer, Integer> create() {
+                return new Store();
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Factory<CacheStoreSessionListener> 
sessionListenerFactory() {
+        return new Factory<CacheStoreSessionListener>() {
+            @Override public CacheStoreSessionListener create() {
+                CacheStoreSessionJdbcListener lsnr = new 
CacheStoreSessionJdbcListener();
+
+                
lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1",
 "", ""));
+
+                return lsnr;
+            }
+        };
+    }
+
+    /**
+     */
+    private static class Store extends CacheStoreAdapter<Integer, Integer> {
+        /** */
+        private static String SES_CONN_KEY = "ses_conn";
+
+        /** */
+        @CacheStoreSessionResource
+        private CacheStoreSession ses;
+
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Integer, Integer> 
clo, Object... args) {
+            loadCacheCnt.incrementAndGet();
+
+            checkConnection();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer key) throws CacheLoaderException 
{
+            loadCnt.incrementAndGet();
+
+            checkConnection();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends 
Integer> entry)
+            throws CacheWriterException {
+            writeCnt.incrementAndGet();
+
+            checkConnection();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            deleteCnt.incrementAndGet();
+
+            checkConnection();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sessionEnd(boolean commit) {
+            assertNull(connection());
+        }
+
+        /**
+         */
+        private void checkConnection() {
+            Connection conn = connection();
+
+            assertNotNull(conn);
+
+            try {
+                assertFalse(conn.isClosed());
+                assertFalse(conn.getAutoCommit());
+            }
+            catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+
+            verifySameInstance(conn);
+        }
+
+        /**
+         * @param conn Connection.
+         */
+        private void verifySameInstance(Connection conn) {
+            Map<String, Connection> props = ses.properties();
+
+            Connection sesConn = props.get(SES_CONN_KEY);
+
+            if (sesConn == null)
+                props.put(SES_CONN_KEY, conn);
+            else {
+                assertSame(conn, sesConn);
+
+                reuseCnt.incrementAndGet();
+            }
+        }
+
+        /**
+         * @return Connection.
+         */
+        private Connection connection() {
+            return ses.<String, 
Connection>properties().get(CacheStoreSessionJdbcListener.JDBC_CONN_KEY);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index aaf7e5b..afb67f5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.*;
+import org.apache.ignite.cache.store.jdbc.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -130,6 +131,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
 
         suite.addTestSuite(CacheOffheapMapEntrySelfTest.class);
 
+        suite.addTestSuite(CacheStoreSessionJdbcListenerSelfTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
----------------------------------------------------------------------
diff --git 
a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
 
b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
new file mode 100644
index 0000000..eff5e6c
--- /dev/null
+++ 
b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.hibernate;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.hibernate.*;
+
+import javax.cache.integration.*;
+import java.util.*;
+
+/**
+ * TODO
+ */
+public class CacheStoreSessionHibernateListener implements 
CacheStoreSessionListener {
+    /** Session key for JDBC connection. */
+    public static final String HIBERNATE_SES_KEY = "__hibernate_ses_";
+
+    /** Hibernate session factory. */
+    private SessionFactory sesFactory;
+
+    /** Store session. */
+    @CacheStoreSessionResource
+    private CacheStoreSession ses;
+
+    /**
+     * Sets Hibernate session factory.
+     *
+     * @param sesFactory Session factory.
+     */
+    public void setSessionFactory(SessionFactory sesFactory) {
+        A.notNull(sesFactory, "sesFactory");
+
+        this.sesFactory = sesFactory;
+    }
+
+    /**
+     * Gets Hibernate session factory.
+     *
+     * @return Session factory.
+     */
+    public SessionFactory getSessionFactory() {
+        return sesFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionStart(CacheStoreSession ses) {
+        Map<String, Session> props = ses.properties();
+
+        if (!props.containsKey(HIBERNATE_SES_KEY)) {
+            try {
+                Session hibSes = sesFactory.openSession();
+
+                props.put(HIBERNATE_SES_KEY, hibSes);
+
+                if (ses.isWithinTransaction())
+                    hibSes.beginTransaction();
+            }
+            catch (HibernateException e) {
+                throw new CacheWriterException("Failed to start store session 
[tx=" + ses.transaction() + ']', e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+        Session hibSes = ses.<String, 
Session>properties().remove(HIBERNATE_SES_KEY);
+
+        if (hibSes != null) {
+            try {
+                Transaction tx = hibSes.getTransaction();
+
+                if (commit) {
+                    hibSes.flush();
+
+                    if (tx != null)
+                        tx.commit();
+                }
+                else if (tx != null)
+                    tx.rollback();
+            }
+            catch (HibernateException e) {
+                throw new CacheWriterException("Failed to end store session 
[tx=" + ses.transaction() + ']', e);
+            }
+            finally {
+                hibSes.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
 
b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
new file mode 100644
index 0000000..85b0b95
--- /dev/null
+++ 
b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.hibernate;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.hibernate.*;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.service.*;
+
+import javax.cache.Cache;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.util.*;
+
+/**
+ * Tests for {@link CacheStoreSessionJdbcListener}.
+ */
+public class CacheStoreSessionHibernateListenerSelfTest extends 
CacheStoreSessionListenerAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected Factory<? extends CacheStore<Integer, Integer>> 
storeFactory() {
+        return new Factory<CacheStore<Integer, Integer>>() {
+            @Override public CacheStore<Integer, Integer> create() {
+                return new Store();
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Factory<CacheStoreSessionListener> 
sessionListenerFactory() {
+        return new Factory<CacheStoreSessionListener>() {
+            @Override public CacheStoreSessionListener create() {
+                CacheStoreSessionHibernateListener lsnr = new 
CacheStoreSessionHibernateListener();
+
+                Configuration cfg = new Configuration().
+                    setProperty("hibernate.dialect", 
"org.hibernate.dialect.H2Dialect").
+                    setProperty("hibernate.connection.datasource", 
"jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
+
+                lsnr.setSessionFactory(cfg.buildSessionFactory(new 
ServiceRegistryBuilder().buildServiceRegistry()));
+
+                return lsnr;
+            }
+        };
+    }
+
+    /**
+     */
+    private static class Store extends CacheStoreAdapter<Integer, Integer> {
+        /** */
+        private static String SES_CONN_KEY = "ses_conn";
+
+        /** */
+        @CacheStoreSessionResource
+        private CacheStoreSession ses;
+
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Integer, Integer> 
clo, Object... args) {
+            loadCacheCnt.incrementAndGet();
+
+            checkSession();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer key) throws CacheLoaderException 
{
+            loadCnt.incrementAndGet();
+
+            checkSession();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends 
Integer> entry)
+            throws CacheWriterException {
+            writeCnt.incrementAndGet();
+
+            checkSession();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            deleteCnt.incrementAndGet();
+
+            checkSession();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sessionEnd(boolean commit) {
+            assertNull(session());
+        }
+
+        /**
+         */
+        private void checkSession() {
+            Session hibSes = session();
+
+            assertNotNull(hibSes);
+
+            assertTrue(hibSes.isOpen());
+
+            if (ses.isWithinTransaction())
+                assertNotNull(hibSes.getTransaction());
+            else
+                assertNull(hibSes.getTransaction());
+
+            verifySameInstance(hibSes);
+        }
+
+        /**
+         * @param hibSes Session.
+         */
+        private void verifySameInstance(Session hibSes) {
+            Map<String, Session> props = ses.properties();
+
+            Session sesConn = props.get(SES_CONN_KEY);
+
+            if (sesConn == null)
+                props.put(SES_CONN_KEY, hibSes);
+            else {
+                assertSame(hibSes, sesConn);
+
+                reuseCnt.incrementAndGet();
+            }
+        }
+
+        /**
+         * @return Connection.
+         */
+        private Session session() {
+            return ses.<String, 
Session>properties().get(CacheStoreSessionHibernateListener.HIBERNATE_SES_KEY);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/spring/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spring/pom.xml b/modules/spring/pom.xml
index 8494ad0..58f4356 100644
--- a/modules/spring/pom.xml
+++ b/modules/spring/pom.xml
@@ -103,6 +103,20 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-jdbc</artifactId>
+            <version>${spring.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>1.3.175</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
----------------------------------------------------------------------
diff --git 
a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
 
b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
new file mode 100644
index 0000000..a2cf622
--- /dev/null
+++ 
b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.spring;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.transactions.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.support.*;
+
+import javax.cache.integration.*;
+
+/**
+ * TODO
+ */
+public class CacheStoreSessionSpringListener implements 
CacheStoreSessionListener {
+    /** Session key for transaction status. */
+    public static final String TX_STATUS_KEY = "__spring_tx_status_";
+
+    /** Transaction manager. */
+    private PlatformTransactionManager txMgr;
+
+    /**
+     * Sets transaction manager.
+     *
+     * @param txMgr Transaction manager.
+     */
+    public void setTransactionManager(PlatformTransactionManager txMgr) {
+        A.notNull(txMgr, "txMgr");
+
+        this.txMgr = txMgr;
+    }
+
+    /**
+     * Gets transaction manager.
+     *
+     * @return Transaction manager.
+     */
+    public PlatformTransactionManager getTransactionManager() {
+        return txMgr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionStart(CacheStoreSession ses) {
+        if (ses.isWithinTransaction()) {
+            try {
+                ses.properties().put(TX_STATUS_KEY, 
txMgr.getTransaction(definition(ses.transaction())));
+            }
+            catch (TransactionException e) {
+                throw new CacheWriterException("Failed to start store session 
[tx=" + ses.transaction() + ']', e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+        if (ses.isWithinTransaction()) {
+            TransactionStatus tx = ses.<String, 
TransactionStatus>properties().remove(TX_STATUS_KEY);
+
+            if (tx != null) {
+                try {
+                    if (commit)
+                        txMgr.commit(tx);
+                    else
+                        txMgr.rollback(tx);
+                }
+                catch (TransactionException e) {
+                    throw new CacheWriterException("Failed to end store 
session [tx=" + ses.transaction() + ']', e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Gets DB transaction isolation level based on ongoing cache transaction 
isolation.
+     *
+     * @return DB transaction isolation.
+     */
+    private TransactionDefinition definition(Transaction tx) {
+        assert tx != null;
+
+        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
+
+        def.setIsolationLevel(isolationLevel(tx.isolation()));
+
+        return def;
+    }
+
+    /**
+     * Gets DB transaction isolation level based on ongoing cache transaction 
isolation.
+     *
+     * @param isolation Cache transaction isolation.
+     * @return DB transaction isolation.
+     */
+    private int isolationLevel(TransactionIsolation isolation) {
+        switch (isolation) {
+            case READ_COMMITTED:
+                return TransactionDefinition.ISOLATION_READ_COMMITTED;
+
+            case REPEATABLE_READ:
+                return TransactionDefinition.ISOLATION_REPEATABLE_READ;
+
+            case SERIALIZABLE:
+                return TransactionDefinition.ISOLATION_SERIALIZABLE;
+
+            default:
+                throw new IllegalStateException(); // Will never happen.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
 
b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
new file mode 100644
index 0000000..a7ca317
--- /dev/null
+++ 
b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.spring;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.datasource.*;
+import org.springframework.transaction.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * Tests for {@link CacheStoreSessionJdbcListener}.
+ */
+public class CacheStoreSessionSpringListenerSelfTest extends 
CacheStoreSessionListenerAbstractSelfTest {
+    /** */
+    private static final DataSource DATA_SRC = new 
DriverManagerDataSource("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
+
+    /** {@inheritDoc} */
+    @Override protected Factory<? extends CacheStore<Integer, Integer>> 
storeFactory() {
+        return new Factory<CacheStore<Integer, Integer>>() {
+            @Override public CacheStore<Integer, Integer> create() {
+                return new Store(new JdbcTemplate(DATA_SRC));
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Factory<CacheStoreSessionListener> 
sessionListenerFactory() {
+        return new Factory<CacheStoreSessionListener>() {
+            @Override public CacheStoreSessionListener create() {
+                CacheStoreSessionSpringListener lsnr = new 
CacheStoreSessionSpringListener();
+
+                lsnr.setTransactionManager(new 
DataSourceTransactionManager(DATA_SRC));
+
+                return lsnr;
+            }
+        };
+    }
+
+    /**
+     */
+    private static class Store extends CacheStoreAdapter<Integer, Integer> {
+        /** */
+        private static String SES_CONN_KEY = "ses_conn";
+
+        /** */
+        private final JdbcTemplate jdbc;
+
+        /** */
+        @CacheStoreSessionResource
+        private CacheStoreSession ses;
+
+        /**
+         * @param jdbc JDBC template.
+         */
+        private Store(JdbcTemplate jdbc) {
+            this.jdbc = jdbc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Integer, Integer> 
clo, Object... args) {
+            loadCacheCnt.incrementAndGet();
+
+            checkTransaction();
+            checkConnection();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer key) throws CacheLoaderException 
{
+            loadCnt.incrementAndGet();
+
+            checkTransaction();
+            checkConnection();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends 
Integer> entry)
+            throws CacheWriterException {
+            writeCnt.incrementAndGet();
+
+            checkTransaction();
+            checkConnection();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            deleteCnt.incrementAndGet();
+
+            checkTransaction();
+            checkConnection();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sessionEnd(boolean commit) {
+            assertNull(transaction());
+        }
+
+        /**
+         */
+        private void checkTransaction() {
+            TransactionStatus tx = transaction();
+
+            if (ses.isWithinTransaction()) {
+                assertNotNull(tx);
+                assertFalse(tx.isCompleted());
+            }
+            else
+                assertNull(tx);
+        }
+
+        /**
+         * @return Transaction status.
+         */
+        private TransactionStatus transaction() {
+            return ses.<String, 
TransactionStatus>properties().get(CacheStoreSessionSpringListener.TX_STATUS_KEY);
+        }
+
+        /**
+         */
+        private void checkConnection() {
+            Connection conn = 
DataSourceUtils.getConnection(jdbc.getDataSource());
+
+            assertNotNull(conn);
+
+            try {
+                assertFalse(conn.isClosed());
+                assertEquals(!ses.isWithinTransaction(), conn.getAutoCommit());
+            }
+            catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+
+            verifySameInstance(conn);
+        }
+
+        /**
+         * @param conn Connection.
+         */
+        private void verifySameInstance(Connection conn) {
+            Map<String, Connection> props = ses.properties();
+
+            Connection sesConn = props.get(SES_CONN_KEY);
+
+            if (sesConn == null)
+                props.put(SES_CONN_KEY, conn);
+            else {
+                assertSame(conn, sesConn);
+
+                reuseCnt.incrementAndGet();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
 
b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
index 8251c18..0b7e471 100644
--- 
a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
+++ 
b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.*;
+import org.apache.ignite.cache.store.spring.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.p2p.*;
 import org.apache.ignite.spring.*;
@@ -47,6 +48,8 @@ public class IgniteSpringTestSuite extends TestSuite {
 
         suite.addTest(new 
TestSuite(IgniteStartFromStreamConfigurationTest.class));
 
+        suite.addTestSuite(CacheStoreSessionSpringListenerSelfTest.class);
+
         return suite;
     }
 }

Reply via email to