Repository: incubator-ignite Updated Branches: refs/heads/ignite-891 [created] b3d608367
# IGNITE-891 - Cache store refactoring Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b3d60836 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b3d60836 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b3d60836 Branch: refs/heads/ignite-891 Commit: b3d6083673b4226e1e64f1448b24bd2a82beac91 Parents: f6012f1 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon May 11 23:32:41 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon May 11 23:32:41 2015 -0700 ---------------------------------------------------------------------- .../store/jdbc/CacheJdbcPersonStore.java | 146 +++++-------------- .../cache/store/CacheLoadOnlyStoreAdapter.java | 6 + .../apache/ignite/cache/store/CacheStore.java | 10 ++ .../ignite/cache/store/CacheStoreAdapter.java | 20 +++ .../store/jdbc/CacheAbstractJdbcStore.java | 6 + .../store/tx/CacheStoreJdbcSessionListener.java | 84 +++++++++++ .../store/tx/CacheStoreSessionListener.java | 40 +++++ .../configuration/CacheConfiguration.java | 1 - .../configuration/IgniteReflectionFactory.java | 39 ++++- .../cache/CacheStoreBalancingWrapper.java | 6 + .../cache/GridCacheLoaderWriterStore.java | 6 + .../store/GridCacheStoreManagerAdapter.java | 96 +++++++----- .../cache/store/GridCacheWriteBehindStore.java | 6 + .../store/GridCacheBalancingStoreSelfTest.java | 6 + .../cache/store/GridGeneratingTestStore.java | 6 + .../IgniteCacheExpiryStoreLoadSelfTest.java | 6 + .../GridCacheAbstractLocalStoreSelfTest.java | 6 + ...idCacheConfigurationConsistencySelfTest.java | 6 + .../cache/GridCacheGenericTestStore.java | 6 + .../cache/GridCacheLifecycleAwareSelfTest.java | 6 + .../cache/GridCacheStorePutxSelfTest.java | 7 +- .../processors/cache/GridCacheTestStore.java | 6 + .../IgniteTxStoreExceptionAbstractSelfTest.java | 6 + .../IgniteCrossCacheTxStoreSelfTest.java | 6 + .../IgniteCacheStoreSessionAbstractTest.java | 6 + ...acheStoreSessionWriteBehindAbstractTest.java | 6 + modules/spring/pom.xml | 6 + .../tx/CacheStoreSpringSessionListener.java | 84 +++++++++++ 28 files changed, 482 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java index 791f861..d930615 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java @@ -19,15 +19,16 @@ package org.apache.ignite.examples.datagrid.store.jdbc; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.examples.datagrid.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; -import org.jetbrains.annotations.*; +import org.springframework.jdbc.datasource.*; import javax.cache.*; import javax.cache.integration.*; +import javax.sql.*; import java.sql.*; -import java.util.*; /** * Example of {@link CacheStore} implementation that uses JDBC @@ -35,8 +36,11 @@ import java.util.*; * */ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { - /** Transaction metadata attribute name. */ - private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION"; + /** Database URL. */ + private static final String DB_URL = "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"; + + /** Data source. */ + private DataSource dataSrc; /** Auto-injected store session. */ @CacheStoreSessionResource @@ -48,6 +52,10 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { * @throws IgniteException If failed. */ public CacheJdbcPersonStore() throws IgniteException { + dataSrc = new DriverManagerDataSource(DB_URL); + + setSessionListener(new CacheStoreJdbcSessionListener(dataSrc)); + prepareDb(); } @@ -58,11 +66,9 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { * @throws IgniteException If failed. */ private void prepareDb() throws IgniteException { - try (Connection conn = openConnection(false); Statement st = conn.createStatement()) { + try (Connection conn = connection(); Statement st = conn.createStatement()) { st.execute("create table if not exists PERSONS (id number unique, firstName varchar(255), " + - "lastName varchar(255))"); - - conn.commit(); + "" + "lastName varchar(255))"); } catch (SQLException e) { throw new IgniteException("Failed to create database table.", e); @@ -70,32 +76,11 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { } /** {@inheritDoc} */ - @Override public void sessionEnd(boolean commit) { - Map<String, Connection> props = ses.properties(); - - try (Connection conn = props.remove(ATTR_NAME)) { - if (conn != null) { - if (commit) - conn.commit(); - else - conn.rollback(); - } - - System.out.println(">>> Transaction ended [commit=" + commit + ']'); - } - catch (SQLException e) { - throw new CacheWriterException("Failed to end transaction: " + ses.transaction(), e); - } - } - - /** {@inheritDoc} */ @Override public Person load(Long key) { System.out.println(">>> Loading key: " + key); - Connection conn = null; - try { - conn = connection(); + Connection conn = connection(); try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) { st.setString(1, key.toString()); @@ -109,9 +94,6 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { catch (SQLException e) { throw new CacheLoaderException("Failed to load object: " + key, e); } - finally { - end(conn); - } return null; } @@ -124,10 +106,8 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { System.out.println(">>> Putting [key=" + key + ", val=" + val + ']'); - Connection conn = null; - try { - conn = connection(); + Connection conn = connection(); int updated; @@ -157,19 +137,14 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { catch (SQLException e) { throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e); } - finally { - end(conn); - } } /** {@inheritDoc} */ @Override public void delete(Object key) { System.out.println(">>> Removing key: " + key); - Connection conn = null; - try { - conn = connection(); + Connection conn = connection(); try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) { st.setLong(1, (Long)key); @@ -180,9 +155,6 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { catch (SQLException e) { throw new CacheWriterException("Failed to remove object: " + key, e); } - finally { - end(conn); - } } /** {@inheritDoc} */ @@ -192,84 +164,32 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { final int entryCnt = (Integer)args[0]; - try (Connection conn = connection()) { - try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) { - try (ResultSet rs = st.executeQuery()) { - int cnt = 0; - - while (cnt < entryCnt && rs.next()) { - Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); + try ( + Connection conn = connection(); + PreparedStatement st = conn.prepareStatement("select * from PERSONS"); + ResultSet rs = st.executeQuery() + ) { + int cnt = 0; - clo.apply(person.getId(), person); + while (cnt < entryCnt && rs.next()) { + Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); - cnt++; - } + clo.apply(person.getId(), person); - System.out.println(">>> Loaded " + cnt + " values into cache."); - } + cnt++; } + + System.out.println(">>> Loaded " + cnt + " values into cache."); } catch (SQLException e) { throw new CacheLoaderException("Failed to load values from cache store.", e); } } - /** - * @return Connection. - * @throws SQLException In case of error. - */ - private Connection connection() throws SQLException { - // If there is an ongoing transaction, - // we must reuse the same connection. - if (ses.isWithinTransaction()) { - Map<Object, Object> props = ses.properties(); - - Connection conn = (Connection)props.get(ATTR_NAME); - - if (conn == null) { - conn = openConnection(false); - - // Store connection in session properties, so it can be accessed - // for other operations on the same transaction. - props.put(ATTR_NAME, conn); - } - - return conn; - } - // Transaction can be null in case of simple load or put operation. + private Connection connection() throws SQLException { + if (ses != null && ses.isWithinTransaction()) + return ses.<String, Connection>properties().get(CacheStoreJdbcSessionListener.CONN_KEY); else - return openConnection(true); - } - - /** - * Closes allocated resources depending on transaction status. - * - * @param conn Allocated connection. - */ - private void end(@Nullable Connection conn) { - if (!ses.isWithinTransaction() && conn != null) { - // Close connection right away if there is no transaction. - try { - conn.close(); - } - catch (SQLException ignored) { - // No-op. - } - } - } - - /** - * Gets connection from a pool. - * - * @param autocommit {@code true} If connection should use autocommit mode. - * @return Pooled connection. - * @throws SQLException In case of error. - */ - private Connection openConnection(boolean autocommit) throws SQLException { - Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); - - conn.setAutoCommit(autocommit); - - return conn; + return dataSrc.getConnection(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java index a0cbe65..ae4bb5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java @@ -18,6 +18,7 @@ package org.apache.ignite.cache.store; import org.apache.ignite.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; @@ -259,6 +260,11 @@ public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K // No-op. } + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } + /** * Worker. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java index d018298..19c94d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java @@ -18,6 +18,7 @@ package org.apache.ignite.cache.store; import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; @@ -94,6 +95,15 @@ public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> { * @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases * may bring cache transaction into {@link TransactionState#UNKNOWN} which will * consequently cause all transacted entries to be invalidated. + * @deprecated Deprecated in favor of {@link CacheStoreSessionListener}. */ + @Deprecated public void sessionEnd(boolean commit) throws CacheWriterException; + + /** + * Gets session listener. + * + * @return Session listener. + */ + public CacheStoreSessionListener getSessionListener(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java index f7adf2c..0c66c86 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java @@ -17,6 +17,7 @@ package org.apache.ignite.cache.store; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.lang.*; import javax.cache.*; @@ -37,6 +38,23 @@ import java.util.*; * specific arguments. */ public abstract class CacheStoreAdapter<K, V> implements CacheStore<K, V> { + /** Transaction manager. */ + private CacheStoreSessionListener sesLsnr; + + /** + * Sets session listener. + * + * @param sesLsnr Session listener. + */ + public void setSessionListener(CacheStoreSessionListener sesLsnr) { + this.sesLsnr = sesLsnr; + } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return sesLsnr; + } + /** * Default empty implementation. This method needs to be overridden only if * {@link org.apache.ignite.IgniteCache#loadCache(IgniteBiPredicate, Object...)} method @@ -87,7 +105,9 @@ public abstract class CacheStoreAdapter<K, V> implements CacheStore<K, V> { * for all other cases this method should be overridden with custom commit/rollback logic. * * @param commit {@inheritDoc} + * @deprecated {@inheritDoc} */ + @Deprecated @Override public void sessionEnd(boolean commit) { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index 22d6d7a..3f42407 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cache.store.jdbc.dialect.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -342,6 +343,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, } } + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } + /** * Retrieves the value of the designated column in the current row of this <code>ResultSet</code> object and * will convert to the requested Java data type. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreJdbcSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreJdbcSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreJdbcSessionListener.java new file mode 100644 index 0000000..3ad14ad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreJdbcSessionListener.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.tx; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import javax.cache.*; +import javax.sql.*; +import java.sql.*; + +/** + * TODO + */ +public class CacheStoreJdbcSessionListener implements CacheStoreSessionListener { + /** */ + public static final String CONN_KEY = "__conn_"; + + /** */ + private DataSource dataSrc; + + public CacheStoreJdbcSessionListener(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + public DataSource getDataSource() { + return dataSrc; + } + + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + assert !ses.properties().containsKey(CONN_KEY); + + try { + Connection conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + ses.properties().put(CONN_KEY, conn); + } + catch (SQLException e) { + throw new CacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + Connection conn = ses.<String, Connection>properties().remove(CONN_KEY); + + if (conn != null) { + try { + if (commit) + conn.commit(); + else + conn.rollback(); + } + catch (SQLException e) { + throw new CacheException(e); + } + finally { + U.closeQuiet(conn); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSessionListener.java new file mode 100644 index 0000000..2157def --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSessionListener.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.tx; + +import org.apache.ignite.cache.store.*; + +/** + * Transaction manager for persistence store. + */ +public interface CacheStoreSessionListener { + /** + * Called when session is started. + * + * @param ses Store session. + */ + public void onSessionStart(CacheStoreSession ses); + + /** + * Called when session is finished. + * + * @param ses Store session. + * @param commit Whether transaction was committed. + */ + public void onSessionEnd(CacheStoreSession ses, boolean commit); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index df6b2ee..692f1e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -774,7 +774,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { * * @param loadPrevVal Load previous value flag. * @return {@code this} for chaining. - * @return {@code this} for chaining. */ public CacheConfiguration setLoadPreviousValue(boolean loadPrevVal) { this.loadPrevVal = loadPrevVal; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java index 3222938..59d8ae7 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteReflectionFactory.java @@ -39,11 +39,18 @@ public class IgniteReflectionFactory<T> implements Factory<T> { private volatile Class<? extends T> cls; /** */ + private volatile Object[] args; + + /** */ private volatile Map<String, Serializable> props; /** */ private transient T instance; + public static <T> Factory<T> factoryOf(Class<? extends T> cls, Object... args) { + return new IgniteReflectionFactory<>(cls, args); + } + /** * */ @@ -54,19 +61,22 @@ public class IgniteReflectionFactory<T> implements Factory<T> { /** * @param cls Component class. * @param singleton Singleton flag. + * @param args Constructor arguments. */ - public IgniteReflectionFactory(Class<? extends T> cls, boolean singleton) { + public IgniteReflectionFactory(Class<? extends T> cls, boolean singleton, Object... args) { this.cls = cls; this.singleton = singleton; + this.args = args; } /** * Creates non-singleton component factory. * * @param cls Component class. + * @param args Constructor arguments. */ - public IgniteReflectionFactory(Class<? extends T> cls) { - this(cls, false); + public IgniteReflectionFactory(Class<? extends T> cls, Object... args) { + this(cls, false, args); } /** @@ -128,18 +138,37 @@ public class IgniteReflectionFactory<T> implements Factory<T> { /** * @return Initialized instance. */ + @SuppressWarnings("unchecked") private T createInstance() { if (cls == null) throw new IllegalStateException("Failed to create object (object type is not set)."); try { - T obj = cls.newInstance(); + T obj = null; + + if (args == null || args.length == 0) + obj = cls.newInstance(); + else { + for (Constructor<?> cons : cls.getDeclaredConstructors()) { + try { + obj = (T)cons.newInstance(args); + + break; + } + catch (IllegalArgumentException | ClassCastException ignored) { + // No-op. + } + } + + if (obj == null) + throw new IllegalArgumentException(""); + } injectProperties(obj); return obj; } - catch (InstantiationException | IllegalAccessException e) { + catch (InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new CacheException("Failed to instantiate factory object: " + cls.getName(), e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java index 7bbf08b..8f6c0ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; @@ -222,6 +223,11 @@ public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> { delegate.sessionEnd(commit); } + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return delegate.getSessionListener(); + } + /** * */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java index 0413853..1c9c07f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.*; import org.jetbrains.annotations.*; @@ -140,4 +141,9 @@ class GridCacheLoaderWriterStore<K, V> implements CacheStore<K, V>, LifecycleAwa @Override public void sessionEnd(boolean commit) { // No-op. } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index f9a966c..a090f8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.store; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; @@ -59,6 +60,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt private ThreadLocal<SessionData> sesHolder; /** */ + private ThreadLocalSession locSes; + + /** */ private boolean locStore; /** */ @@ -84,14 +88,15 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sesHolder0 = ((Map<CacheStore, ThreadLocal>)sesHolders).get(cfgStore); if (sesHolder0 == null) { - ThreadLocalSession locSes = new ThreadLocalSession(); + sesHolder0 = new ThreadLocal<>(); - if (ctx.resource().injectStoreSession(cfgStore, locSes)) { - sesHolder0 = locSes.sesHolder; + locSes = new ThreadLocalSession(sesHolder0); + if (ctx.resource().injectStoreSession(cfgStore, locSes)) sesHolders.put(cfgStore, sesHolder0); - } } + else + locSes = new ThreadLocalSession(sesHolder0); } sesHolder = sesHolder0; @@ -215,14 +220,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; Object val = null; try { val = singleThreadGate.load(storeKey); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -234,7 +239,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -349,7 +354,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { @@ -380,7 +385,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt else singleThreadGate.loadAll(keys0, c); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -392,7 +397,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -408,7 +413,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(null); - boolean thewEx = true; + boolean threwEx = true; try { store.loadCache(new IgniteBiInClosure<Object, Object>() { @@ -431,7 +436,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } }, args); - thewEx = false; + threwEx = false; } catch (CacheLoaderException e) { throw new IgniteCheckedException(e); @@ -440,7 +445,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(null, thewEx); + sessionEnd0(null, threwEx); } if (log.isDebugEnabled()) @@ -473,12 +478,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -490,7 +495,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheWriterException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -522,12 +527,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.writeAll(entries); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -548,7 +553,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(e); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -576,12 +581,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.delete(key); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -593,7 +598,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheWriterException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -625,12 +630,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.deleteAll(keys0); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -645,7 +650,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(e); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -675,7 +680,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); try { - store.sessionEnd(commit); + CacheStoreSessionListener sesLsnr = store.getSessionListener(); + + if (sesLsnr != null) + sesLsnr.onSessionEnd(locSes, commit); + else + store.sessionEnd(commit); } finally { if (sesHolder != null) { @@ -715,11 +725,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt * @param tx Current transaction. */ private void sessionInit0(@Nullable IgniteInternalTx tx) { - if (sesHolder == null) - return; - + assert sesHolder != null; assert sesHolder.get() == null; + CacheStoreSessionListener sesLsnr = store.getSessionListener(); + SessionData ses; if (tx != null) { @@ -729,14 +739,21 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt ses = new SessionData(tx, cctx.name()); tx.addMeta(SES_ATTR, ses); + + if (sesLsnr != null) + sesLsnr.onSessionStart(locSes); } else // Session cache name may change in cross-cache transaction. ses.cacheName(cctx.name()); } - else + else { ses = new SessionData(null, cctx.name()); + if (sesLsnr != null) + sesLsnr.onSessionStart(locSes); + } + sesHolder.set(ses); } @@ -745,8 +762,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt */ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException { try { - if (tx == null) - store.sessionEnd(threwEx); + if (tx == null) { + CacheStoreSessionListener sesLsnr = store.getSessionListener(); + + if (sesLsnr != null) + sesLsnr.onSessionEnd(locSes, !threwEx); + else + store.sessionEnd(!threwEx); + } } catch (Exception e) { if (!threwEx) @@ -839,7 +862,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt */ private static class ThreadLocalSession implements CacheStoreSession { /** */ - private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); + private final ThreadLocal<SessionData> sesHolder; + + /** + * @param sesHolder Session holder. + */ + private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) { + this.sesHolder = sesHolder; + } /** {@inheritDoc} */ @Nullable @Override public Transaction transaction() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java index 5ce42f9..2f1dd89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.store; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; @@ -473,6 +474,11 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheWriteBehindStore.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java index b14ac5f..8a127f0 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.cache.store; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.*; @@ -197,5 +198,10 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest { @Override public void sessionEnd(boolean commit) { // No-op. } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java b/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java index 7e9c82e..ba08274 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/GridGeneratingTestStore.java @@ -17,6 +17,7 @@ package org.apache.ignite.cache.store; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; @@ -98,4 +99,9 @@ public class GridGeneratingTestStore implements CacheStore<String, String> { @Override public void sessionEnd(boolean commit) { // No-op. } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java index a3a5717..b7e95fd 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.cache.store; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; @@ -233,5 +234,10 @@ public class IgniteCacheExpiryStoreLoadSelfTest extends GridCacheAbstractSelfTes @Override public void deleteAll(Collection<?> keys) { // No-op. } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java index 157ba93..2af1696 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.processors.cache.store.*; @@ -387,6 +388,11 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst } /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } + + /** {@inheritDoc} */ @Override public IgniteBiTuple<V, ?> load(K key) throws CacheLoaderException { return map.get(key); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java index 44171a8..14f8289 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.cache.eviction.fifo.*; import org.apache.ignite.cache.eviction.lru.*; import org.apache.ignite.cache.eviction.random.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; @@ -893,6 +894,11 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac @Override public void sessionEnd(boolean commit) { // No-op. } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } } private static class TestRendezvousAffinityFunction extends RendezvousAffinityFunction { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java index bc54951..5baced2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -255,6 +256,11 @@ public class GridCacheGenericTestStore<K, V> implements CacheStore<K, V> { // No-op. } + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } + /** * Checks the flag and throws exception if it is set. Checks operation delay and sleeps * for specified amount of time, if needed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java index de5899d..a2b684c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.eviction.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; @@ -113,6 +114,11 @@ public class GridCacheLifecycleAwareSelfTest extends GridAbstractLifecycleAwareS @Override public void sessionEnd(boolean commit) { // No-op. } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java index a5ed22a..50b515e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStorePutxSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.discovery.tcp.*; @@ -28,7 +29,6 @@ import org.apache.ignite.testframework.junits.common.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; -import javax.cache.configuration.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -154,5 +154,10 @@ public class GridCacheStorePutxSelfTest extends GridCommonAbstractTest { @Override public void sessionEnd(boolean commit) { // No-op. } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java index 1129e5a..7a60e02 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.*; @@ -303,6 +304,11 @@ public final class GridCacheTestStore implements CacheStore<Integer, String> { // No-op. } + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } + /** * Checks the flag and throws exception if it is set. Checks operation delay and sleeps * for specified amount of time, if needed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java index 0f2acda..d81800b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -660,5 +661,10 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb if (fail && commit) throw new CacheWriterException("Store exception"); } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java index cb32b13..bf09f26 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.*; @@ -290,6 +291,11 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { evts.add("deleteAll " + cacheName + " " + keys.size()); } + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } + /** * @return Store session. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java index 7784003..1755a0b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionAbstractTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.integration; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.*; @@ -311,6 +312,11 @@ public abstract class IgniteCacheStoreSessionAbstractTest extends IgniteCacheAbs checkSession("deleteAll"); } + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } + /** * @return Store session. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java index d846881..4e4c74a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.integration; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.tx.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; @@ -204,6 +205,11 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign checkSession("deleteAll"); } + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener getSessionListener() { + return null; + } + /** * @return Store session. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/spring/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spring/pom.xml b/modules/spring/pom.xml index 8494ad0..dbd1f0a 100644 --- a/modules/spring/pom.xml +++ b/modules/spring/pom.xml @@ -77,6 +77,12 @@ </dependency> <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-jdbc</artifactId> + <version>${spring.version}</version> + </dependency> + + <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3d60836/modules/spring/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSpringSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSpringSessionListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSpringSessionListener.java new file mode 100644 index 0000000..d3be25c --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/tx/CacheStoreSpringSessionListener.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.tx; + +import org.apache.ignite.cache.store.*; +import org.springframework.transaction.*; +import org.springframework.transaction.support.*; + +/** + * TODO + */ +public class CacheStoreSpringSessionListener implements CacheStoreSessionListener { + public static final String TX_STATUS_KEY = "__tx_status_"; + + private PlatformTransactionManager txMgr; + + public PlatformTransactionManager getTransactionManager() { + return txMgr; + } + + public void setTransactionManager(PlatformTransactionManager txMgr) { + this.txMgr = txMgr; + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + assert ses.isWithinTransaction(); + assert !ses.properties().containsKey(TX_STATUS_KEY); + + DefaultTransactionDefinition def = new DefaultTransactionDefinition(); + + def.setIsolationLevel(txIsolation(ses)); + + ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(def)); + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + TransactionStatus tx = ses.<String, TransactionStatus>properties().get(TX_STATUS_KEY); + + if (tx != null) { + if (commit) + txMgr.commit(tx); + else + txMgr.rollback(tx); + } + } + + /** + * Gets DB transaction isolation level based on ongoing cache transaction isolation. + * + * @return DB transaction isolation. + */ + private int txIsolation(CacheStoreSession ses) { + switch (ses.transaction().isolation()) { + case READ_COMMITTED: + return TransactionDefinition.ISOLATION_READ_COMMITTED; + + case REPEATABLE_READ: + return TransactionDefinition.ISOLATION_REPEATABLE_READ; + + case SERIALIZABLE: + return TransactionDefinition.ISOLATION_SERIALIZABLE; + + default: + throw new IllegalStateException(); // Will never happen. + } + } +}