Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-891 [created] b3d608367


# IGNITE-891 - Cache store refactoring


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b3d60836
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b3d60836
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b3d60836

Branch: refs/heads/ignite-891
Commit: b3d6083673b4226e1e64f1448b24bd2a82beac91
Parents: f6012f1
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Mon May 11 23:32:41 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Mon May 11 23:32:41 2015 -0700

----------------------------------------------------------------------
 .../store/jdbc/CacheJdbcPersonStore.java        | 146 +++++--------------
 .../cache/store/CacheLoadOnlyStoreAdapter.java  |   6 +
 .../apache/ignite/cache/store/CacheStore.java   |  10 ++
 .../ignite/cache/store/CacheStoreAdapter.java   |  20 +++
 .../store/jdbc/CacheAbstractJdbcStore.java      |   6 +
 .../store/tx/CacheStoreJdbcSessionListener.java |  84 +++++++++++
 .../store/tx/CacheStoreSessionListener.java     |  40 +++++
 .../configuration/CacheConfiguration.java       |   1 -
 .../configuration/IgniteReflectionFactory.java  |  39 ++++-
 .../cache/CacheStoreBalancingWrapper.java       |   6 +
 .../cache/GridCacheLoaderWriterStore.java       |   6 +
 .../store/GridCacheStoreManagerAdapter.java     |  96 +++++++-----
 .../cache/store/GridCacheWriteBehindStore.java  |   6 +
 .../store/GridCacheBalancingStoreSelfTest.java  |   6 +
 .../cache/store/GridGeneratingTestStore.java    |   6 +
 .../IgniteCacheExpiryStoreLoadSelfTest.java     |   6 +
 .../GridCacheAbstractLocalStoreSelfTest.java    |   6 +
 ...idCacheConfigurationConsistencySelfTest.java |   6 +
 .../cache/GridCacheGenericTestStore.java        |   6 +
 .../cache/GridCacheLifecycleAwareSelfTest.java  |   6 +
 .../cache/GridCacheStorePutxSelfTest.java       |   7 +-
 .../processors/cache/GridCacheTestStore.java    |   6 +
 .../IgniteTxStoreExceptionAbstractSelfTest.java |   6 +
 .../IgniteCrossCacheTxStoreSelfTest.java        |   6 +
 .../IgniteCacheStoreSessionAbstractTest.java    |   6 +
 ...acheStoreSessionWriteBehindAbstractTest.java |   6 +
 modules/spring/pom.xml                          |   6 +
 .../tx/CacheStoreSpringSessionListener.java     |  84 +++++++++++
 28 files changed, 482 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
index 791f861..d930615 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
@@ -19,15 +19,16 @@ package org.apache.ignite.examples.datagrid.store.jdbc;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.examples.datagrid.store.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
-import org.jetbrains.annotations.*;
+import org.springframework.jdbc.datasource.*;
 
 import javax.cache.*;
 import javax.cache.integration.*;
+import javax.sql.*;
 import java.sql.*;
-import java.util.*;
 
 /**
  * Example of {@link CacheStore} implementation that uses JDBC
@@ -35,8 +36,11 @@ import java.util.*;
  *
  */
 public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
-    /** Transaction metadata attribute name. */
-    private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION";
+    /** Database URL. */
+    private static final String DB_URL = 
"jdbc:h2:mem:example;DB_CLOSE_DELAY=-1";
+
+    /** Data source. */
+    private DataSource dataSrc;
 
     /** Auto-injected store session. */
     @CacheStoreSessionResource
@@ -48,6 +52,10 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
      * @throws IgniteException If failed.
      */
     public CacheJdbcPersonStore() throws IgniteException {
+        dataSrc = new DriverManagerDataSource(DB_URL);
+
+        setSessionListener(new CacheStoreJdbcSessionListener(dataSrc));
+
         prepareDb();
     }
 
@@ -58,11 +66,9 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
      * @throws IgniteException If failed.
      */
     private void prepareDb() throws IgniteException {
-        try (Connection conn = openConnection(false); Statement st = 
conn.createStatement()) {
+        try (Connection conn = connection(); Statement st = 
conn.createStatement()) {
             st.execute("create table if not exists PERSONS (id number unique, 
firstName varchar(255), " +
-                "lastName varchar(255))");
-
-            conn.commit();
+                "" + "lastName varchar(255))");
         }
         catch (SQLException e) {
             throw new IgniteException("Failed to create database table.", e);
@@ -70,32 +76,11 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
     }
 
     /** {@inheritDoc} */
-    @Override public void sessionEnd(boolean commit) {
-        Map<String, Connection> props = ses.properties();
-
-        try (Connection conn = props.remove(ATTR_NAME)) {
-            if (conn != null) {
-                if (commit)
-                    conn.commit();
-                else
-                    conn.rollback();
-            }
-
-            System.out.println(">>> Transaction ended [commit=" + commit + 
']');
-        }
-        catch (SQLException e) {
-            throw new CacheWriterException("Failed to end transaction: " + 
ses.transaction(), e);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public Person load(Long key) {
         System.out.println(">>> Loading key: " + key);
 
-        Connection conn = null;
-
         try {
-            conn = connection();
+            Connection conn = connection();
 
             try (PreparedStatement st = conn.prepareStatement("select * from 
PERSONS where id=?")) {
                 st.setString(1, key.toString());
@@ -109,9 +94,6 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
         catch (SQLException e) {
             throw new CacheLoaderException("Failed to load object: " + key, e);
         }
-        finally {
-            end(conn);
-        }
 
         return null;
     }
@@ -124,10 +106,8 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
 
         System.out.println(">>> Putting [key=" + key + ", val=" + val +  ']');
 
-        Connection conn = null;
-
         try {
-            conn = connection();
+            Connection conn = connection();
 
             int updated;
 
@@ -157,19 +137,14 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
         catch (SQLException e) {
             throw new CacheLoaderException("Failed to put object [key=" + key 
+ ", val=" + val + ']', e);
         }
-        finally {
-            end(conn);
-        }
     }
 
     /** {@inheritDoc} */
     @Override public void delete(Object key) {
         System.out.println(">>> Removing key: " + key);
 
-        Connection conn = null;
-
         try {
-            conn = connection();
+            Connection conn = connection();
 
             try (PreparedStatement st = conn.prepareStatement("delete from 
PERSONS where id=?")) {
                 st.setLong(1, (Long)key);
@@ -180,9 +155,6 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
         catch (SQLException e) {
             throw new CacheWriterException("Failed to remove object: " + key, 
e);
         }
-        finally {
-            end(conn);
-        }
     }
 
     /** {@inheritDoc} */
@@ -192,84 +164,32 @@ public class CacheJdbcPersonStore extends 
CacheStoreAdapter<Long, Person> {
 
         final int entryCnt = (Integer)args[0];
 
-        try (Connection conn = connection()) {
-            try (PreparedStatement st = conn.prepareStatement("select * from 
PERSONS")) {
-                try (ResultSet rs = st.executeQuery()) {
-                    int cnt = 0;
-
-                    while (cnt < entryCnt && rs.next()) {
-                        Person person = new Person(rs.getLong(1), 
rs.getString(2), rs.getString(3));
+        try (
+            Connection conn = connection();
+            PreparedStatement st = conn.prepareStatement("select * from 
PERSONS");
+            ResultSet rs = st.executeQuery()
+        ) {
+            int cnt = 0;
 
-                        clo.apply(person.getId(), person);
+            while (cnt < entryCnt && rs.next()) {
+                Person person = new Person(rs.getLong(1), rs.getString(2), 
rs.getString(3));
 
-                        cnt++;
-                    }
+                clo.apply(person.getId(), person);
 
-                    System.out.println(">>> Loaded " + cnt + " values into 
cache.");
-                }
+                cnt++;
             }
+
+            System.out.println(">>> Loaded " + cnt + " values into cache.");
         }
         catch (SQLException e) {
             throw new CacheLoaderException("Failed to load values from cache 
store.", e);
         }
     }
 
-    /**
-     * @return Connection.
-     * @throws SQLException In case of error.
-     */
-    private Connection connection() throws SQLException  {
-        // If there is an ongoing transaction,
-        // we must reuse the same connection.
-        if (ses.isWithinTransaction()) {
-            Map<Object, Object> props = ses.properties();
-
-            Connection conn = (Connection)props.get(ATTR_NAME);
-
-            if (conn == null) {
-                conn = openConnection(false);
-
-                // Store connection in session properties, so it can be 
accessed
-                // for other operations on the same transaction.
-                props.put(ATTR_NAME, conn);
-            }
-
-            return conn;
-        }
-        // Transaction can be null in case of simple load or put operation.
+    private Connection connection() throws SQLException {
+        if (ses != null && ses.isWithinTransaction())
+            return ses.<String, 
Connection>properties().get(CacheStoreJdbcSessionListener.CONN_KEY);
         else
-            return openConnection(true);
-    }
-
-    /**
-     * Closes allocated resources depending on transaction status.
-     *
-     * @param conn Allocated connection.
-     */
-    private void end(@Nullable Connection conn) {
-        if (!ses.isWithinTransaction() && conn != null) {
-            // Close connection right away if there is no transaction.
-            try {
-                conn.close();
-            }
-            catch (SQLException ignored) {
-                // No-op.
-            }
-        }
-    }
-
-    /**
-     * Gets connection from a pool.
-     *
-     * @param autocommit {@code true} If connection should use autocommit mode.
-     * @return Pooled connection.
-     * @throws SQLException In case of error.
-     */
-    private Connection openConnection(boolean autocommit) throws SQLException {
-        Connection conn = 
DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
-
-        conn.setAutoCommit(autocommit);
-
-        return conn;
+            return dataSrc.getConnection();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
index a0cbe65..ae4bb5d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cache.store;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
@@ -259,6 +260,11 @@ public abstract class CacheLoadOnlyStoreAdapter<K, V, I> 
implements CacheStore<K
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public CacheStoreSessionListener getSessionListener() {
+        return null;
+    }
+
     /**
      * Worker.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
index d018298..19c94d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cache.store;
 
 import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
@@ -94,6 +95,15 @@ public interface CacheStore<K, V> extends CacheLoader<K, V>, 
CacheWriter<K, V> {
      * @throws CacheWriterException If commit or rollback failed. Note that 
commit failure in some cases
      *      may bring cache transaction into {@link TransactionState#UNKNOWN} 
which will
      *      consequently cause all transacted entries to be invalidated.
+     * @deprecated Deprecated in favor of {@link CacheStoreSessionListener}.
      */
+    @Deprecated
     public void sessionEnd(boolean commit) throws CacheWriterException;
+
+    /**
+     * Gets session listener.
+     *
+     * @return Session listener.
+     */
+    public CacheStoreSessionListener getSessionListener();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
index f7adf2c..0c66c86 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.cache.store;
 
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.lang.*;
 
 import javax.cache.*;
@@ -37,6 +38,23 @@ import java.util.*;
  * specific arguments.
  */
 public abstract class CacheStoreAdapter<K, V> implements CacheStore<K, V> {
+    /** Transaction manager. */
+    private CacheStoreSessionListener sesLsnr;
+
+    /**
+     * Sets session listener.
+     *
+     * @param sesLsnr Session listener.
+     */
+    public void setSessionListener(CacheStoreSessionListener sesLsnr) {
+        this.sesLsnr = sesLsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheStoreSessionListener getSessionListener() {
+        return sesLsnr;
+    }
+
     /**
      * Default empty implementation. This method needs to be overridden only if
      * {@link org.apache.ignite.IgniteCache#loadCache(IgniteBiPredicate, 
Object...)} method
@@ -87,7 +105,9 @@ public abstract class CacheStoreAdapter<K, V> implements 
CacheStore<K, V> {
      * for all other cases this method should be overridden with custom 
commit/rollback logic.
      *
      * @param commit {@inheritDoc}
+     * @deprecated {@inheritDoc}
      */
+    @Deprecated
     @Override public void sessionEnd(boolean commit) {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 22d6d7a..3f42407 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cache.store.jdbc.dialect.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -342,6 +343,11 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public CacheStoreSessionListener getSessionListener() {
+        return null;
+    }
+
     /**
      * Retrieves the value of the designated column in the current row of this 
<code>ResultSet</code> object and
      * will convert to the requested Java data type.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreJdbcSessionListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreJdbcSessionListener.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreJdbcSessionListener.java
new file mode 100644
index 0000000..3ad14ad
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreJdbcSessionListener.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.tx;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.cache.*;
+import javax.sql.*;
+import java.sql.*;
+
+/**
+ * TODO
+ */
+public class CacheStoreJdbcSessionListener implements 
CacheStoreSessionListener {
+    /** */
+    public static final String CONN_KEY = "__conn_";
+
+    /** */
+    private DataSource dataSrc;
+
+    public CacheStoreJdbcSessionListener(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+    }
+
+    public DataSource getDataSource() {
+        return dataSrc;
+    }
+
+    public void setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionStart(CacheStoreSession ses) {
+        assert !ses.properties().containsKey(CONN_KEY);
+
+        try {
+            Connection conn = dataSrc.getConnection();
+
+            conn.setAutoCommit(false);
+
+            ses.properties().put(CONN_KEY, conn);
+        }
+        catch (SQLException e) {
+            throw new CacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+        Connection conn = ses.<String, 
Connection>properties().remove(CONN_KEY);
+
+        if (conn != null) {
+            try {
+                if (commit)
+                    conn.commit();
+                else
+                    conn.rollback();
+            }
+            catch (SQLException e) {
+                throw new CacheException(e);
+            }
+            finally {
+                U.closeQuiet(conn);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSessionListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSessionListener.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSessionListener.java
new file mode 100644
index 0000000..2157def
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSessionListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.tx;
+
+import org.apache.ignite.cache.store.*;
+
+/**
+ * Transaction manager for persistence store.
+ */
+public interface CacheStoreSessionListener {
+    /**
+     * Called when session is started.
+     *
+     * @param ses Store session.
+     */
+    public void onSessionStart(CacheStoreSession ses);
+
+    /**
+     * Called when session is finished.
+     *
+     * @param ses Store session.
+     * @param commit Whether transaction was committed.
+     */
+    public void onSessionEnd(CacheStoreSession ses, boolean commit);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index df6b2ee..692f1e3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -774,7 +774,6 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
      *
      * @param loadPrevVal Load previous value flag.
      * @return {@code this} for chaining.
-     * @return {@code this} for chaining.
      */
     public CacheConfiguration setLoadPreviousValue(boolean loadPrevVal) {
         this.loadPrevVal = loadPrevVal;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java
index 3222938..59d8ae7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java
@@ -39,11 +39,18 @@ public class IgniteReflectionFactory<T> implements 
Factory<T> {
     private volatile Class<? extends T> cls;
 
     /** */
+    private volatile Object[] args;
+
+    /** */
     private volatile Map<String, Serializable> props;
 
     /** */
     private transient T instance;
 
+    public static <T> Factory<T> factoryOf(Class<? extends T> cls, Object... 
args) {
+        return new IgniteReflectionFactory<>(cls, args);
+    }
+
     /**
      *
      */
@@ -54,19 +61,22 @@ public class IgniteReflectionFactory<T> implements 
Factory<T> {
     /**
      * @param cls Component class.
      * @param singleton Singleton flag.
+     * @param args Constructor arguments.
      */
-    public IgniteReflectionFactory(Class<? extends T> cls, boolean singleton) {
+    public IgniteReflectionFactory(Class<? extends T> cls, boolean singleton, 
Object... args) {
         this.cls = cls;
         this.singleton = singleton;
+        this.args = args;
     }
 
     /**
      * Creates non-singleton component factory.
      *
      * @param cls Component class.
+     * @param args Constructor arguments.
      */
-    public IgniteReflectionFactory(Class<? extends T> cls) {
-        this(cls, false);
+    public IgniteReflectionFactory(Class<? extends T> cls, Object... args) {
+        this(cls, false, args);
     }
 
     /**
@@ -128,18 +138,37 @@ public class IgniteReflectionFactory<T> implements 
Factory<T> {
     /**
      * @return Initialized instance.
      */
+    @SuppressWarnings("unchecked")
     private T createInstance() {
         if (cls == null)
             throw new IllegalStateException("Failed to create object (object 
type is not set).");
 
         try {
-            T obj = cls.newInstance();
+            T obj = null;
+
+            if (args == null || args.length == 0)
+                obj = cls.newInstance();
+            else {
+                for (Constructor<?> cons : cls.getDeclaredConstructors()) {
+                    try {
+                        obj = (T)cons.newInstance(args);
+
+                        break;
+                    }
+                    catch (IllegalArgumentException | ClassCastException 
ignored) {
+                        // No-op.
+                    }
+                }
+
+                if (obj == null)
+                    throw new IllegalArgumentException("");
+            }
 
             injectProperties(obj);
 
             return obj;
         }
-        catch (InstantiationException | IllegalAccessException e) {
+        catch (InvocationTargetException | InstantiationException | 
IllegalAccessException e) {
             throw new CacheException("Failed to instantiate factory object: " 
+ cls.getName(), e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
index 7bbf08b..8f6c0ff 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
@@ -222,6 +223,11 @@ public class CacheStoreBalancingWrapper<K, V> implements 
CacheStore<K, V> {
         delegate.sessionEnd(commit);
     }
 
+    /** {@inheritDoc} */
+    @Override public CacheStoreSessionListener getSessionListener() {
+        return delegate.getSessionListener();
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java
index 0413853..1c9c07f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.lifecycle.*;
 import org.jetbrains.annotations.*;
@@ -140,4 +141,9 @@ class GridCacheLoaderWriterStore<K, V> implements 
CacheStore<K, V>, LifecycleAwa
     @Override public void sessionEnd(boolean commit) {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public CacheStoreSessionListener getSessionListener() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index f9a966c..a090f8a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.store;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -59,6 +60,9 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
     private ThreadLocal<SessionData> sesHolder;
 
     /** */
+    private ThreadLocalSession locSes;
+
+    /** */
     private boolean locStore;
 
     /** */
@@ -84,14 +88,15 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
             sesHolder0 = ((Map<CacheStore, 
ThreadLocal>)sesHolders).get(cfgStore);
 
             if (sesHolder0 == null) {
-                ThreadLocalSession locSes = new ThreadLocalSession();
+                sesHolder0 = new ThreadLocal<>();
 
-                if (ctx.resource().injectStoreSession(cfgStore, locSes)) {
-                    sesHolder0 = locSes.sesHolder;
+                locSes = new ThreadLocalSession(sesHolder0);
 
+                if (ctx.resource().injectStoreSession(cfgStore, locSes))
                     sesHolders.put(cfgStore, sesHolder0);
-                }
             }
+            else
+                locSes = new ThreadLocalSession(sesHolder0);
         }
 
         sesHolder = sesHolder0;
@@ -215,14 +220,14 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             Object val = null;
 
             try {
                 val = singleThreadGate.load(storeKey);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -234,7 +239,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -349,7 +354,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 IgniteBiInClosure<Object, Object> c = new CI2<Object, 
Object>() {
@@ -380,7 +385,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 else
                     singleThreadGate.loadAll(keys0, c);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -392,7 +397,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -408,7 +413,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
 
             sessionInit0(null);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.loadCache(new IgniteBiInClosure<Object, Object>() {
@@ -431,7 +436,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                     }
                 }, args);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (CacheLoaderException e) {
                 throw new IgniteCheckedException(e);
@@ -440,7 +445,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(null, thewEx);
+                sessionEnd0(null, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -473,12 +478,12 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) 
: val));
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -490,7 +495,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheWriterException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -522,12 +527,12 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
                 sessionInit0(tx);
 
-                boolean thewEx = true;
+                boolean threwEx = true;
 
                 try {
                     store.writeAll(entries);
 
-                    thewEx = false;
+                    threwEx = false;
                 }
                 catch (ClassCastException e) {
                     handleClassCastException(e);
@@ -548,7 +553,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                     throw new IgniteCheckedException(e);
                 }
                 finally {
-                    sessionEnd0(tx, thewEx);
+                    sessionEnd0(tx, threwEx);
                 }
 
                 if (log.isDebugEnabled())
@@ -576,12 +581,12 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.delete(key);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -593,7 +598,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheWriterException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -625,12 +630,12 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.deleteAll(keys0);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -645,7 +650,7 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(e);
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -675,7 +680,12 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
         sessionInit0(tx);
 
         try {
-            store.sessionEnd(commit);
+            CacheStoreSessionListener sesLsnr = store.getSessionListener();
+
+            if (sesLsnr != null)
+                sesLsnr.onSessionEnd(locSes, commit);
+            else
+                store.sessionEnd(commit);
         }
         finally {
             if (sesHolder != null) {
@@ -715,11 +725,11 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
      * @param tx Current transaction.
      */
     private void sessionInit0(@Nullable IgniteInternalTx tx) {
-        if (sesHolder == null)
-            return;
-
+        assert sesHolder != null;
         assert sesHolder.get() == null;
 
+        CacheStoreSessionListener sesLsnr = store.getSessionListener();
+
         SessionData ses;
 
         if (tx != null) {
@@ -729,14 +739,21 @@ public abstract class GridCacheStoreManagerAdapter 
extends GridCacheManagerAdapt
                 ses = new SessionData(tx, cctx.name());
 
                 tx.addMeta(SES_ATTR, ses);
+
+                if (sesLsnr != null)
+                    sesLsnr.onSessionStart(locSes);
             }
             else
                 // Session cache name may change in cross-cache transaction.
                 ses.cacheName(cctx.name());
         }
-        else
+        else {
             ses = new SessionData(null, cctx.name());
 
+            if (sesLsnr != null)
+                sesLsnr.onSessionStart(locSes);
+        }
+
         sesHolder.set(ses);
     }
 
@@ -745,8 +762,14 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
      */
     private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) 
throws IgniteCheckedException {
         try {
-            if (tx == null)
-                store.sessionEnd(threwEx);
+            if (tx == null) {
+                CacheStoreSessionListener sesLsnr = store.getSessionListener();
+
+                if (sesLsnr != null)
+                    sesLsnr.onSessionEnd(locSes, !threwEx);
+                else
+                    store.sessionEnd(!threwEx);
+            }
         }
         catch (Exception e) {
             if (!threwEx)
@@ -839,7 +862,14 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
      */
     private static class ThreadLocalSession implements CacheStoreSession {
         /** */
-        private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>();
+        private final ThreadLocal<SessionData> sesHolder;
+
+        /**
+         * @param sesHolder Session holder.
+         */
+        private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) {
+            this.sesHolder = sesHolder;
+        }
 
         /** {@inheritDoc} */
         @Nullable @Override public Transaction transaction() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index 5ce42f9..2f1dd89 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.store;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -473,6 +474,11 @@ public class GridCacheWriteBehindStore<K, V> implements 
CacheStore<K, V>, Lifecy
     }
 
     /** {@inheritDoc} */
+    @Override public CacheStoreSessionListener getSessionListener() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheWriteBehindStore.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
index b14ac5f..8a127f0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.cache.store;
 
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -197,5 +198,10 @@ public class GridCacheBalancingStoreSelfTest extends 
GridCommonAbstractTest {
         @Override public void sessionEnd(boolean commit) {
             // No-op.
         }
+
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java
index 7e9c82e..ba08274 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.cache.store;
 
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
@@ -98,4 +99,9 @@ public class GridGeneratingTestStore implements 
CacheStore<String, String> {
     @Override public void sessionEnd(boolean commit) {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public CacheStoreSessionListener getSessionListener() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java
index a3a5717..b7e95fd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.cache.store;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.lang.*;
@@ -233,5 +234,10 @@ public class IgniteCacheExpiryStoreLoadSelfTest extends 
GridCacheAbstractSelfTes
         @Override public void deleteAll(Collection<?> keys) {
             // No-op.
         }
+
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
index 157ba93..2af1696 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.processors.cache.store.*;
@@ -387,6 +388,11 @@ public abstract class GridCacheAbstractLocalStoreSelfTest 
extends GridCommonAbst
         }
 
         /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Override public IgniteBiTuple<V, ?> load(K key) throws 
CacheLoaderException {
             return map.get(key);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
index 44171a8..14f8289 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.eviction.fifo.*;
 import org.apache.ignite.cache.eviction.lru.*;
 import org.apache.ignite.cache.eviction.random.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -893,6 +894,11 @@ public class GridCacheConfigurationConsistencySelfTest 
extends GridCommonAbstrac
         @Override public void sessionEnd(boolean commit) {
             // No-op.
         }
+
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
     }
 
     private static class TestRendezvousAffinityFunction extends 
RendezvousAffinityFunction {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java
index bc54951..5baced2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -255,6 +256,11 @@ public class GridCacheGenericTestStore<K, V> implements 
CacheStore<K, V> {
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public CacheStoreSessionListener getSessionListener() {
+        return null;
+    }
+
     /**
      * Checks the flag and throws exception if it is set. Checks operation 
delay and sleeps
      * for specified amount of time, if needed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java
index de5899d..a2b684c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.eviction.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
@@ -113,6 +114,11 @@ public class GridCacheLifecycleAwareSelfTest extends 
GridAbstractLifecycleAwareS
         @Override public void sessionEnd(boolean commit) {
             // No-op.
         }
+
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java
index a5ed22a..50b515e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.discovery.tcp.*;
@@ -28,7 +29,6 @@ import org.apache.ignite.testframework.junits.common.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
-import javax.cache.configuration.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
 
@@ -154,5 +154,10 @@ public class GridCacheStorePutxSelfTest extends 
GridCommonAbstractTest {
         @Override public void sessionEnd(boolean commit) {
             // No-op.
         }
+
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java
index 1129e5a..7a60e02 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.util.*;
@@ -303,6 +304,11 @@ public final class GridCacheTestStore implements 
CacheStore<Integer, String> {
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public CacheStoreSessionListener getSessionListener() {
+        return null;
+    }
+
     /**
      * Checks the flag and throws exception if it is set. Checks operation 
delay and sleeps
      * for specified amount of time, if needed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
index 0f2acda..d81800b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
@@ -660,5 +661,10 @@ public abstract class 
IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb
             if (fail && commit)
                 throw new CacheWriterException("Store exception");
         }
+
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index cb32b13..bf09f26 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -290,6 +291,11 @@ public class IgniteCrossCacheTxStoreSelfTest extends 
GridCommonAbstractTest {
             evts.add("deleteAll " + cacheName + " " + keys.size());
         }
 
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
+
         /**
          * @return Store session.
          */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java
index 7784003..1755a0b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.integration;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -311,6 +312,11 @@ public abstract class IgniteCacheStoreSessionAbstractTest 
extends IgniteCacheAbs
             checkSession("deleteAll");
         }
 
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
+
         /**
          * @return Store session.
          */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
index d846881..4e4c74a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.cache.integration;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.tx.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.lang.*;
@@ -204,6 +205,11 @@ public abstract class 
IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
             checkSession("deleteAll");
         }
 
+        /** {@inheritDoc} */
+        @Override public CacheStoreSessionListener getSessionListener() {
+            return null;
+        }
+
         /**
          * @return Store session.
          */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/spring/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spring/pom.xml b/modules/spring/pom.xml
index 8494ad0..dbd1f0a 100644
--- a/modules/spring/pom.xml
+++ b/modules/spring/pom.xml
@@ -77,6 +77,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-jdbc</artifactId>
+            <version>${spring.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>commons-logging</groupId>
             <artifactId>commons-logging</artifactId>
             <version>1.1.1</version>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/spring/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSpringSessionListener.java
----------------------------------------------------------------------
diff --git 
a/modules/spring/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSpringSessionListener.java
 
b/modules/spring/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSpringSessionListener.java
new file mode 100644
index 0000000..d3be25c
--- /dev/null
+++ 
b/modules/spring/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSpringSessionListener.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.tx;
+
+import org.apache.ignite.cache.store.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.support.*;
+
+/**
+ * TODO
+ */
+public class CacheStoreSpringSessionListener implements 
CacheStoreSessionListener {
+    public static final String TX_STATUS_KEY = "__tx_status_";
+
+    private PlatformTransactionManager txMgr;
+
+    public PlatformTransactionManager getTransactionManager() {
+        return txMgr;
+    }
+
+    public void setTransactionManager(PlatformTransactionManager txMgr) {
+        this.txMgr = txMgr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionStart(CacheStoreSession ses) {
+        assert ses.isWithinTransaction();
+        assert !ses.properties().containsKey(TX_STATUS_KEY);
+
+        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
+
+        def.setIsolationLevel(txIsolation(ses));
+
+        ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(def));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+        TransactionStatus tx = ses.<String, 
TransactionStatus>properties().get(TX_STATUS_KEY);
+
+        if (tx != null) {
+            if (commit)
+                txMgr.commit(tx);
+            else
+                txMgr.rollback(tx);
+        }
+    }
+
+    /**
+     * Gets DB transaction isolation level based on ongoing cache transaction 
isolation.
+     *
+     * @return DB transaction isolation.
+     */
+    private int txIsolation(CacheStoreSession ses) {
+        switch (ses.transaction().isolation()) {
+            case READ_COMMITTED:
+                return TransactionDefinition.ISOLATION_READ_COMMITTED;
+
+            case REPEATABLE_READ:
+                return TransactionDefinition.ISOLATION_REPEATABLE_READ;
+
+            case SERIALIZABLE:
+                return TransactionDefinition.ISOLATION_SERIALIZABLE;
+
+            default:
+                throw new IllegalStateException(); // Will never happen.
+        }
+    }
+}

Reply via email to