Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-5 541b1e0f6 -> 2f615220d
IGNITE-891 - Cache store improvements Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b97441f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b97441f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b97441f9 Branch: refs/heads/ignite-sprint-5 Commit: b97441f994bb34b23c54848f6520f506b4094590 Parents: 896b426 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu May 14 19:12:43 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu May 14 19:12:43 2015 -0700 ---------------------------------------------------------------------- .../store/jdbc/CacheJdbcPersonStore.java | 137 +++---------- .../store/jdbc/CacheJdbcStoreExample.java | 14 ++ .../cache/store/CacheStoreSessionListener.java | 27 +++ .../jdbc/CacheStoreSessionJdbcListener.java | 100 +++++++++ .../configuration/CacheConfiguration.java | 26 +++ .../configuration/IgniteConfiguration.java | 31 ++- .../cache/GridCacheSharedContext.java | 4 - .../cache/store/CacheOsStoreManager.java | 1 - .../store/GridCacheStoreManagerAdapter.java | 134 +++++++++--- .../cache/transactions/IgniteTxAdapter.java | 24 ++- .../transactions/IgniteTxLocalAdapter.java | 57 ++++-- ...cheStoreSessionListenerAbstractSelfTest.java | 204 +++++++++++++++++++ .../CacheStoreSessionJdbcListenerSelfTest.java | 145 +++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 3 + .../CacheStoreSessionHibernateListener.java | 106 ++++++++++ ...heStoreSessionHibernateListenerSelfTest.java | 150 ++++++++++++++ modules/spring/pom.xml | 14 ++ .../spring/CacheStoreSessionSpringListener.java | 125 ++++++++++++ ...CacheStoreSessionSpringListenerSelfTest.java | 179 ++++++++++++++++ .../testsuites/IgniteSpringTestSuite.java | 3 + 20 files changed, 1307 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java index 791f861..856512b 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java @@ -19,26 +19,21 @@ package org.apache.ignite.examples.datagrid.store.jdbc; import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.jdbc.*; import org.apache.ignite.examples.datagrid.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; -import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.integration.*; import java.sql.*; -import java.util.*; /** * Example of {@link CacheStore} implementation that uses JDBC * transaction with cache transactions and maps {@link Long} to {@link Person}. - * */ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { - /** Transaction metadata attribute name. */ - private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION"; - - /** Auto-injected store session. */ + /** Store session. */ @CacheStoreSessionResource private CacheStoreSession ses; @@ -58,11 +53,12 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { * @throws IgniteException If failed. */ private void prepareDb() throws IgniteException { - try (Connection conn = openConnection(false); Statement st = conn.createStatement()) { + try ( + Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); + Statement st = conn.createStatement() + ) { st.execute("create table if not exists PERSONS (id number unique, firstName varchar(255), " + "lastName varchar(255))"); - - conn.commit(); } catch (SQLException e) { throw new IgniteException("Failed to create database table.", e); @@ -70,32 +66,11 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { } /** {@inheritDoc} */ - @Override public void sessionEnd(boolean commit) { - Map<String, Connection> props = ses.properties(); - - try (Connection conn = props.remove(ATTR_NAME)) { - if (conn != null) { - if (commit) - conn.commit(); - else - conn.rollback(); - } - - System.out.println(">>> Transaction ended [commit=" + commit + ']'); - } - catch (SQLException e) { - throw new CacheWriterException("Failed to end transaction: " + ses.transaction(), e); - } - } - - /** {@inheritDoc} */ @Override public Person load(Long key) { System.out.println(">>> Loading key: " + key); - Connection conn = null; - try { - conn = connection(); + Connection conn = connection(); try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) { st.setString(1, key.toString()); @@ -109,9 +84,6 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { catch (SQLException e) { throw new CacheLoaderException("Failed to load object: " + key, e); } - finally { - end(conn); - } return null; } @@ -124,10 +96,8 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { System.out.println(">>> Putting [key=" + key + ", val=" + val + ']'); - Connection conn = null; - try { - conn = connection(); + Connection conn = connection(); int updated; @@ -157,19 +127,14 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { catch (SQLException e) { throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e); } - finally { - end(conn); - } } /** {@inheritDoc} */ @Override public void delete(Object key) { System.out.println(">>> Removing key: " + key); - Connection conn = null; - try { - conn = connection(); + Connection conn = connection(); try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) { st.setLong(1, (Long)key); @@ -180,9 +145,6 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { catch (SQLException e) { throw new CacheWriterException("Failed to remove object: " + key, e); } - finally { - end(conn); - } } /** {@inheritDoc} */ @@ -192,22 +154,23 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { final int entryCnt = (Integer)args[0]; - try (Connection conn = connection()) { - try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) { - try (ResultSet rs = st.executeQuery()) { - int cnt = 0; + Connection conn = connection(); - while (cnt < entryCnt && rs.next()) { - Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); + try ( + PreparedStatement st = conn.prepareStatement("select * from PERSONS"); + ResultSet rs = st.executeQuery() + ) { + int cnt = 0; - clo.apply(person.getId(), person); + while (cnt < entryCnt && rs.next()) { + Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); - cnt++; - } + clo.apply(person.getId(), person); - System.out.println(">>> Loaded " + cnt + " values into cache."); - } + cnt++; } + + System.out.println(">>> Loaded " + cnt + " values into cache."); } catch (SQLException e) { throw new CacheLoaderException("Failed to load values from cache store.", e); @@ -215,61 +178,11 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { } /** - * @return Connection. - * @throws SQLException In case of error. - */ - private Connection connection() throws SQLException { - // If there is an ongoing transaction, - // we must reuse the same connection. - if (ses.isWithinTransaction()) { - Map<Object, Object> props = ses.properties(); - - Connection conn = (Connection)props.get(ATTR_NAME); - - if (conn == null) { - conn = openConnection(false); - - // Store connection in session properties, so it can be accessed - // for other operations on the same transaction. - props.put(ATTR_NAME, conn); - } - - return conn; - } - // Transaction can be null in case of simple load or put operation. - else - return openConnection(true); - } - - /** - * Closes allocated resources depending on transaction status. - * - * @param conn Allocated connection. - */ - private void end(@Nullable Connection conn) { - if (!ses.isWithinTransaction() && conn != null) { - // Close connection right away if there is no transaction. - try { - conn.close(); - } - catch (SQLException ignored) { - // No-op. - } - } - } - - /** - * Gets connection from a pool. + * Gets JDBC connection attached to current session. * - * @param autocommit {@code true} If connection should use autocommit mode. - * @return Pooled connection. - * @throws SQLException In case of error. + * @return Connection. */ - private Connection openConnection(boolean autocommit) throws SQLException { - Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); - - conn.setAutoCommit(autocommit); - - return conn; + private Connection connection() { + return ses.<String, Connection>properties().get(CacheStoreSessionJdbcListener.JDBC_CONN_KEY); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java index 1cb73c9..82e1079 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java @@ -18,10 +18,13 @@ package org.apache.ignite.examples.datagrid.store.jdbc; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.jdbc.*; import org.apache.ignite.configuration.*; import org.apache.ignite.examples.*; import org.apache.ignite.examples.datagrid.store.*; import org.apache.ignite.transactions.*; +import org.h2.jdbcx.*; import javax.cache.configuration.*; import java.util.*; @@ -71,6 +74,17 @@ public class CacheJdbcStoreExample { // Configure JDBC store. cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(CacheJdbcPersonStore.class)); + // Configure JDBC session listener. + cacheCfg.setCacheStoreSessionListenerFactories(new Factory<CacheStoreSessionListener>() { + @Override public CacheStoreSessionListener create() { + CacheStoreSessionJdbcListener lsnr = new CacheStoreSessionJdbcListener(); + + lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", "")); + + return lsnr; + } + }); + cacheCfg.setReadThrough(true); cacheCfg.setWriteThrough(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java new file mode 100644 index 0000000..e57714b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +/** + * TODO + */ +public interface CacheStoreSessionListener { + public void onSessionStart(CacheStoreSession ses); + + public void onSessionEnd(CacheStoreSession ses, boolean commit); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java new file mode 100644 index 0000000..9622063 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; + +import javax.cache.integration.*; +import javax.sql.*; +import java.sql.*; +import java.util.*; + +/** + * TODO + */ +public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener { + /** Session key for JDBC connection. */ + public static final String JDBC_CONN_KEY = "__jdbc_conn_"; + + /** Data source. */ + private DataSource dataSrc; + + /** Store session. */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** + * Sets data source. + * + * @param dataSrc Data source. + */ + public void setDataSource(DataSource dataSrc) { + A.notNull(dataSrc, "dataSrc"); + + this.dataSrc = dataSrc; + } + + /** + * Gets data source. + * + * @return Data source. + */ + public DataSource getDataSource() { + return dataSrc; + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + Map<String, Connection> props = ses.properties(); + + if (!props.containsKey(JDBC_CONN_KEY)) { + try { + Connection conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + props.put(JDBC_CONN_KEY, conn); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + Connection conn = ses.<String, Connection>properties().remove(JDBC_CONN_KEY); + + if (conn != null) { + try { + if (commit) + conn.commit(); + else + conn.rollback(); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + finally { + U.closeQuiet(conn); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index df6b2ee..33a5711c 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -316,6 +316,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Cache topology validator. */ private TopologyValidator topValidator; + /** Cache store session listeners. */ + private Factory<? extends CacheStoreSessionListener>[] storeSesLsnrs; + /** Empty constructor (all values are initialized to their defaults). */ public CacheConfiguration() { /* No-op. */ @@ -389,6 +392,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { sqlOnheapRowCacheSize = cc.getSqlOnheapRowCacheSize(); startSize = cc.getStartSize(); storeFactory = cc.getCacheStoreFactory(); + storeSesLsnrs = cc.getCacheStoreSessionListenerFactories(); swapEnabled = cc.isSwapEnabled(); tmLookupClsName = cc.getTransactionManagerLookupClassName(); topValidator = cc.getTopologyValidator(); @@ -1734,6 +1738,28 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { return this; } + /** + * Gets cache store session listener factories. + * + * @return Cache store session listener factories. + */ + public Factory<? extends CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() { + return storeSesLsnrs; + } + + /** + * Cache store session listener factories. + * + * @param storeSesLsnrs Cache store session listener factories. + * @return {@code this} for chaining. + */ + public CacheConfiguration setCacheStoreSessionListenerFactories( + Factory<? extends CacheStoreSessionListener>... storeSesLsnrs) { + this.storeSesLsnrs = storeSesLsnrs; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index ebe2b8e..96ac7e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -18,6 +18,7 @@ package org.apache.ignite.configuration; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.events.*; @@ -52,6 +53,7 @@ import org.apache.ignite.spi.loadbalancing.roundrobin.*; import org.apache.ignite.spi.swapspace.*; import org.apache.ignite.spi.swapspace.file.*; +import javax.cache.configuration.*; import javax.cache.event.*; import javax.cache.expiry.*; import javax.cache.integration.*; @@ -334,9 +336,6 @@ public class IgniteConfiguration { /** Cache configurations. */ private CacheConfiguration[] cacheCfg; - /** Client cache configurations. */ - private NearCacheConfiguration[] nearCacheCfg; - /** Client mode flag. */ private Boolean clientMode; @@ -398,6 +397,9 @@ public class IgniteConfiguration { /** User's class loader. */ private ClassLoader classLdr; + /** Cache store session listeners. */ + private Factory<CacheStoreSessionListener>[] storeSesLsnrs; + /** * Creates valid grid configuration with all default values. */ @@ -478,6 +480,7 @@ public class IgniteConfiguration { segResolvers = cfg.getSegmentationResolvers(); sndRetryCnt = cfg.getNetworkSendRetryCount(); sndRetryDelay = cfg.getNetworkSendRetryDelay(); + storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories(); svcCfgs = cfg.getServiceConfiguration(); sysPoolSize = cfg.getSystemThreadPoolSize(); timeSrvPortBase = cfg.getTimeServerPortBase(); @@ -2242,6 +2245,28 @@ public class IgniteConfiguration { return classLdr; } + /** + * Gets cache store session listener factories. + * + * @return Cache store session listener factories. + */ + public Factory<CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() { + return storeSesLsnrs; + } + + /** + * Cache store session listener factories. + * + * @param storeSesLsnrs Cache store session listener factories. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setCacheStoreSessionListenerFactories( + Factory<CacheStoreSessionListener>... storeSesLsnrs) { + this.storeSesLsnrs = storeSesLsnrs; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 294c2b0..dacd1aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -441,10 +441,6 @@ public class GridCacheSharedContext<K, V> { if (activeCacheCtx.cacheId() != cacheCtx.cacheId()) return false; } - - // Check that caches have the same store. - if (activeCacheCtx.store().store() != cacheCtx.store().store()) - return false; } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java index 5fde622..02fe679 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.store; -import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index f9a966c..a9ea2c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -35,6 +35,7 @@ import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import javax.cache.*; +import javax.cache.configuration.*; import javax.cache.integration.*; import java.util.*; @@ -59,11 +60,17 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt private ThreadLocal<SessionData> sesHolder; /** */ + private ThreadLocalSession locSes; + + /** */ private boolean locStore; /** */ private boolean writeThrough; + /** */ + private CacheStoreSessionListener[] sesLsnrs; + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException { @@ -84,19 +91,43 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sesHolder0 = ((Map<CacheStore, ThreadLocal>)sesHolders).get(cfgStore); if (sesHolder0 == null) { - ThreadLocalSession locSes = new ThreadLocalSession(); + sesHolder0 = new ThreadLocal<>(); - if (ctx.resource().injectStoreSession(cfgStore, locSes)) { - sesHolder0 = locSes.sesHolder; + locSes = new ThreadLocalSession(sesHolder0); + if (ctx.resource().injectStoreSession(cfgStore, locSes)) sesHolders.put(cfgStore, sesHolder0); - } } + else + locSes = new ThreadLocalSession(sesHolder0); } sesHolder = sesHolder0; locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class); + + sesLsnrs = createSessionListeners(cfg.getCacheStoreSessionListenerFactories()); + + if (sesLsnrs == null) + sesLsnrs = createSessionListeners(ctx.config().getCacheStoreSessionListenerFactories()); + } + + /** + * Creates session listeners. + * + * @param factories Factories. + * @return Listeners. + */ + private CacheStoreSessionListener[] createSessionListeners(Factory<CacheStoreSessionListener>[] factories) { + if (factories == null) + return null; + + CacheStoreSessionListener[] lsnrs = new CacheStoreSessionListener[factories.length]; + + for (int i = 0; i < factories.length; i++) + lsnrs[i] = factories[i].create(); + + return lsnrs; } /** {@inheritDoc} */ @@ -215,14 +246,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; Object val = null; try { val = singleThreadGate.load(storeKey); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -234,7 +265,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -349,7 +380,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { @@ -380,7 +411,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt else singleThreadGate.loadAll(keys0, c); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -392,7 +423,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -408,7 +439,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(null); - boolean thewEx = true; + boolean threwEx = true; try { store.loadCache(new IgniteBiInClosure<Object, Object>() { @@ -431,7 +462,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } }, args); - thewEx = false; + threwEx = false; } catch (CacheLoaderException e) { throw new IgniteCheckedException(e); @@ -440,7 +471,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(null, thewEx); + sessionEnd0(null, threwEx); } if (log.isDebugEnabled()) @@ -473,12 +504,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -490,7 +521,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheWriterException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -522,12 +553,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.writeAll(entries); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -548,7 +579,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(e); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -576,12 +607,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.delete(key); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -593,7 +624,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheWriterException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -625,12 +656,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.deleteAll(keys0); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -645,7 +676,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(e); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -675,6 +706,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); try { + if (sesLsnrs != null) { + for (CacheStoreSessionListener lsnr : sesLsnrs) + lsnr.onSessionEnd(locSes, commit); + } + store.sessionEnd(commit); } finally { @@ -715,9 +751,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt * @param tx Current transaction. */ private void sessionInit0(@Nullable IgniteInternalTx tx) { - if (sesHolder == null) - return; - + assert sesHolder != null; assert sesHolder.get() == null; SessionData ses; @@ -738,6 +772,15 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt ses = new SessionData(null, cctx.name()); sesHolder.set(ses); + + if (!ses.started()) { + if (sesLsnrs != null) { + for (CacheStoreSessionListener lsnr : sesLsnrs) + lsnr.onSessionStart(locSes); + } + + ses.onStarted(); + } } /** @@ -745,8 +788,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt */ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException { try { - if (tx == null) - store.sessionEnd(threwEx); + if (tx == null) { + if (sesLsnrs != null) { + for (CacheStoreSessionListener lsnr : sesLsnrs) + lsnr.onSessionEnd(locSes, !threwEx); + } + + store.sessionEnd(!threwEx); + } } catch (Exception e) { if (!threwEx) @@ -788,6 +837,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt @GridToStringInclude private Map<Object, Object> props; + /** */ + private boolean started; + /** * @param tx Current transaction. * @param cacheName Cache name. @@ -828,6 +880,19 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt this.cacheName = cacheName; } + /** + */ + private void onStarted() { + started = true; + } + + /** + * @return If session is started. + */ + private boolean started() { + return started; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SessionData.class, this, "tx", CU.txString(tx)); @@ -839,7 +904,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt */ private static class ThreadLocalSession implements CacheStoreSession { /** */ - private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); + private final ThreadLocal<SessionData> sesHolder; + + /** + * @param sesHolder Session holder. + */ + private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) { + this.sesHolder = sesHolder; + } /** {@inheritDoc} */ @Nullable @Override public Transaction transaction() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 64cc77f..f513b59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -437,7 +437,12 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public boolean storeUsed() { - return storeEnabled() && store() != null; + if (!storeEnabled()) + return false; + + Collection<CacheStoreManager> stores = stores(); + + return stores != null && !stores.isEmpty(); } /** @@ -445,13 +450,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * * @return Store manager. */ - protected CacheStoreManager store() { - if (!activeCacheIds().isEmpty()) { - int cacheId = F.first(activeCacheIds()); + protected Collection<CacheStoreManager> stores() { + Collection<Integer> cacheIds = activeCacheIds(); - CacheStoreManager store = cctx.cacheContext(cacheId).store(); + if (!cacheIds.isEmpty()) { + Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size()); + + for (int cacheId : cacheIds) { + CacheStoreManager store = cctx.cacheContext(cacheId).store(); + + if (store.configured()) + stores.add(store); + } - return store.configured() ? store : null; + return stores; } return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 7e9095c..40bb36e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -496,17 +496,24 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ @SuppressWarnings({"CatchGenericClass"}) protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException { - CacheStoreManager store = store(); + if (!storeEnabled() || (internal() && !groupLock())) + return; - if (store != null && store.isWriteThrough() && storeEnabled() && - (!internal() || groupLock()) && (near() || store.isWriteToStoreFromDht())) { + Collection<CacheStoreManager> stores = stores(); + + if (stores == null || stores.isEmpty()) + return; + + boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht(); + + if (near() || isWriteToStoreFromDht) { try { if (writeEntries != null) { Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null; List<Object> rmvCol = null; CacheStoreManager writeStore = null; - boolean skipNear = near() && store.isWriteToStoreFromDht(); + boolean skipNear = near() && isWriteToStoreFromDht; for (IgniteTxEntry e : writeEntries) { if ((skipNear && e.cached().isNear()) || e.skipStore()) @@ -560,12 +567,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal)); } - if (putMap == null) - putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); + if (writeStore == null) + writeStore = cacheCtx.store(); - putMap.put(CU.value(key, cacheCtx, false), F.t(CU.value(val, cacheCtx, false), ver)); + if (writeStore.isWriteThrough()) { + if (putMap == null) + putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); - writeStore = cacheCtx.store(); + putMap.put(CU.value(key, cacheCtx, false), F.t(CU.value(val, cacheCtx, false), ver)); + } } else if (op == DELETE) { // Batch-process all puts if needed. @@ -597,12 +607,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter continue; } - if (rmvCol == null) - rmvCol = new ArrayList<>(); + if (writeStore == null) + writeStore = cacheCtx.store(); - rmvCol.add(key.value(cacheCtx.cacheObjectContext(), false)); + if (writeStore.isWriteThrough()) { + if (rmvCol == null) + rmvCol = new ArrayList<>(); - writeStore = cacheCtx.store(); + rmvCol.add(key.value(cacheCtx.cacheObjectContext(), false)); + } } else if (log.isDebugEnabled()) log.debug("Ignoring NOOP entry for batch store commit: " + e); @@ -626,7 +639,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } // Commit while locks are held. - store.sessionEnd(this, true); + for (CacheStoreManager store : stores) + store.sessionEnd(this, true); } catch (IgniteCheckedException ex) { commitError(ex); @@ -994,11 +1008,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else { - CacheStoreManager store = store(); + Collection<CacheStoreManager> stores = stores(); - if (store != null && (!internal() || groupLock())) { + if (stores != null && !stores.isEmpty() && (!internal() || groupLock())) { try { - store.sessionEnd(this, true); + for (CacheStoreManager store : stores) + store.sessionEnd(this, true); } catch (IgniteCheckedException e) { commitError(e); @@ -1099,11 +1114,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().rollbackTx(this); - CacheStoreManager store = store(); + Collection<CacheStoreManager> stores = stores(); - if (store != null && (near() || store.isWriteToStoreFromDht())) { - if (!internal() || groupLock()) - store.sessionEnd(this, false); + if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht())) { + if (!internal() || groupLock()) { + for (CacheStoreManager store : stores) + store.sessionEnd(this, false); + } } } catch (Error | IgniteCheckedException | RuntimeException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java new file mode 100644 index 0000000..5a01c2d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import javax.cache.configuration.*; +import java.io.*; +import java.util.concurrent.atomic.*; + +/** + * Tests for store session listeners. + */ +public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridCommonAbstractTest implements Serializable { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + protected static final AtomicInteger loadCacheCnt = new AtomicInteger(); + + /** */ + protected static final AtomicInteger loadCnt = new AtomicInteger(); + + /** */ + protected static final AtomicInteger writeCnt = new AtomicInteger(); + + /** */ + protected static final AtomicInteger deleteCnt = new AtomicInteger(); + + /** */ + protected static final AtomicInteger reuseCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + loadCacheCnt.set(0); + loadCnt.set(0); + writeCnt.set(0); + deleteCnt.set(0); + reuseCnt.set(0); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicCache() throws Exception { + CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.ATOMIC); + + try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) { + cache.loadCache(null); + cache.get(1); + cache.put(1, 1); + cache.remove(1); + } + + assertEquals(3, loadCacheCnt.get()); + assertEquals(1, loadCnt.get()); + assertEquals(1, writeCnt.get()); + assertEquals(1, deleteCnt.get()); + assertEquals(0, reuseCnt.get()); + } + + /** + * @throws Exception If failed. + */ + public void testTransactionalCache() throws Exception { + CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL); + + try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) { + cache.loadCache(null); + cache.get(1); + cache.put(1, 1); + cache.remove(1); + } + + assertEquals(3, loadCacheCnt.get()); + assertEquals(1, loadCnt.get()); + assertEquals(1, writeCnt.get()); + assertEquals(1, deleteCnt.get()); + assertEquals(0, reuseCnt.get()); + + } + + /** + * @throws Exception If failed. + */ + public void testExplicitTransaction() throws Exception { + CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL); + + try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) { + try (Transaction tx = ignite(0).transactions().txStart()) { + cache.put(1, 1); + cache.put(2, 2); + cache.remove(3); + cache.remove(4); + + tx.commit(); + } + } + + assertEquals(2, writeCnt.get()); + assertEquals(2, deleteCnt.get()); + assertEquals(3, reuseCnt.get()); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTransaction() throws Exception { + CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL); + CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL); + + try ( + IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1); + IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2) + ) { + try (Transaction tx = ignite(0).transactions().txStart()) { + cache1.put(1, 1); + cache2.put(2, 2); + cache1.remove(3); + cache2.remove(4); + + tx.commit(); + } + } + + assertEquals(2, writeCnt.get()); + assertEquals(2, deleteCnt.get()); + assertEquals(3, reuseCnt.get()); + } + + /** + * @param name Cache name. + * @param atomicity Atomicity mode. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(String name, CacheAtomicityMode atomicity) { + CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(); + + cfg.setName(name); + cfg.setAtomicityMode(atomicity); + cfg.setCacheStoreFactory(storeFactory()); + cfg.setCacheStoreSessionListenerFactories(sessionListenerFactory()); + cfg.setReadThrough(true); + cfg.setWriteThrough(true); + cfg.setLoadPreviousValue(true); + + return cfg; + } + + /** + * @return Store factory. + */ + protected abstract Factory<? extends CacheStore<Integer, Integer>> storeFactory(); + + /** + * @return Session listener factory. + */ + protected abstract Factory<CacheStoreSessionListener> sessionListenerFactory(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java new file mode 100644 index 0000000..9020e0d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.h2.jdbcx.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import java.sql.*; +import java.util.*; + +/** + * Tests for {@link CacheStoreSessionJdbcListener}. + */ +public class CacheStoreSessionJdbcListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() { + return new Factory<CacheStore<Integer, Integer>>() { + @Override public CacheStore<Integer, Integer> create() { + return new Store(); + } + }; + } + + /** {@inheritDoc} */ + @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() { + return new Factory<CacheStoreSessionListener>() { + @Override public CacheStoreSessionListener create() { + CacheStoreSessionJdbcListener lsnr = new CacheStoreSessionJdbcListener(); + + lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", "")); + + return lsnr; + } + }; + } + + /** + */ + private static class Store extends CacheStoreAdapter<Integer, Integer> { + /** */ + private static String SES_CONN_KEY = "ses_conn"; + + /** */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) { + loadCacheCnt.incrementAndGet(); + + checkConnection(); + } + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + loadCnt.incrementAndGet(); + + checkConnection(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) + throws CacheWriterException { + writeCnt.incrementAndGet(); + + checkConnection(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + deleteCnt.incrementAndGet(); + + checkConnection(); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + assertNull(connection()); + } + + /** + */ + private void checkConnection() { + Connection conn = connection(); + + assertNotNull(conn); + + try { + assertFalse(conn.isClosed()); + assertFalse(conn.getAutoCommit()); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + + verifySameInstance(conn); + } + + /** + * @param conn Connection. + */ + private void verifySameInstance(Connection conn) { + Map<String, Connection> props = ses.properties(); + + Connection sesConn = props.get(SES_CONN_KEY); + + if (sesConn == null) + props.put(SES_CONN_KEY, conn); + else { + assertSame(conn, sesConn); + + reuseCnt.incrementAndGet(); + } + } + + /** + * @return Connection. + */ + private Connection connection() { + return ses.<String, Connection>properties().get(CacheStoreSessionJdbcListener.JDBC_CONN_KEY); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index aaf7e5b..afb67f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.*; +import org.apache.ignite.cache.store.jdbc.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; @@ -130,6 +131,8 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheOffheapMapEntrySelfTest.class); + suite.addTestSuite(CacheStoreSessionJdbcListenerSelfTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java new file mode 100644 index 0000000..eff5e6c --- /dev/null +++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.hibernate; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.hibernate.*; + +import javax.cache.integration.*; +import java.util.*; + +/** + * TODO + */ +public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener { + /** Session key for JDBC connection. */ + public static final String HIBERNATE_SES_KEY = "__hibernate_ses_"; + + /** Hibernate session factory. */ + private SessionFactory sesFactory; + + /** Store session. */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** + * Sets Hibernate session factory. + * + * @param sesFactory Session factory. + */ + public void setSessionFactory(SessionFactory sesFactory) { + A.notNull(sesFactory, "sesFactory"); + + this.sesFactory = sesFactory; + } + + /** + * Gets Hibernate session factory. + * + * @return Session factory. + */ + public SessionFactory getSessionFactory() { + return sesFactory; + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + Map<String, Session> props = ses.properties(); + + if (!props.containsKey(HIBERNATE_SES_KEY)) { + try { + Session hibSes = sesFactory.openSession(); + + props.put(HIBERNATE_SES_KEY, hibSes); + + if (ses.isWithinTransaction()) + hibSes.beginTransaction(); + } + catch (HibernateException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + Session hibSes = ses.<String, Session>properties().remove(HIBERNATE_SES_KEY); + + if (hibSes != null) { + try { + Transaction tx = hibSes.getTransaction(); + + if (commit) { + hibSes.flush(); + + if (tx != null) + tx.commit(); + } + else if (tx != null) + tx.rollback(); + } + catch (HibernateException e) { + throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e); + } + finally { + hibSes.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java new file mode 100644 index 0000000..85b0b95 --- /dev/null +++ b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.hibernate; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.hibernate.*; +import org.hibernate.cfg.Configuration; +import org.hibernate.service.*; + +import javax.cache.Cache; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import java.util.*; + +/** + * Tests for {@link CacheStoreSessionJdbcListener}. + */ +public class CacheStoreSessionHibernateListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() { + return new Factory<CacheStore<Integer, Integer>>() { + @Override public CacheStore<Integer, Integer> create() { + return new Store(); + } + }; + } + + /** {@inheritDoc} */ + @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() { + return new Factory<CacheStoreSessionListener>() { + @Override public CacheStoreSessionListener create() { + CacheStoreSessionHibernateListener lsnr = new CacheStoreSessionHibernateListener(); + + Configuration cfg = new Configuration(). + setProperty("hibernate.dialect", "org.hibernate.dialect.H2Dialect"). + setProperty("hibernate.connection.datasource", "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); + + lsnr.setSessionFactory(cfg.buildSessionFactory(new ServiceRegistryBuilder().buildServiceRegistry())); + + return lsnr; + } + }; + } + + /** + */ + private static class Store extends CacheStoreAdapter<Integer, Integer> { + /** */ + private static String SES_CONN_KEY = "ses_conn"; + + /** */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) { + loadCacheCnt.incrementAndGet(); + + checkSession(); + } + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + loadCnt.incrementAndGet(); + + checkSession(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) + throws CacheWriterException { + writeCnt.incrementAndGet(); + + checkSession(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + deleteCnt.incrementAndGet(); + + checkSession(); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + assertNull(session()); + } + + /** + */ + private void checkSession() { + Session hibSes = session(); + + assertNotNull(hibSes); + + assertTrue(hibSes.isOpen()); + + if (ses.isWithinTransaction()) + assertNotNull(hibSes.getTransaction()); + else + assertNull(hibSes.getTransaction()); + + verifySameInstance(hibSes); + } + + /** + * @param hibSes Session. + */ + private void verifySameInstance(Session hibSes) { + Map<String, Session> props = ses.properties(); + + Session sesConn = props.get(SES_CONN_KEY); + + if (sesConn == null) + props.put(SES_CONN_KEY, hibSes); + else { + assertSame(hibSes, sesConn); + + reuseCnt.incrementAndGet(); + } + } + + /** + * @return Connection. + */ + private Session session() { + return ses.<String, Session>properties().get(CacheStoreSessionHibernateListener.HIBERNATE_SES_KEY); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/spring/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spring/pom.xml b/modules/spring/pom.xml index 8494ad0..58f4356 100644 --- a/modules/spring/pom.xml +++ b/modules/spring/pom.xml @@ -103,6 +103,20 @@ <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-jdbc</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>1.3.175</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java new file mode 100644 index 0000000..a2cf622 --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.spring; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.transactions.*; +import org.springframework.transaction.*; +import org.springframework.transaction.support.*; + +import javax.cache.integration.*; + +/** + * TODO + */ +public class CacheStoreSessionSpringListener implements CacheStoreSessionListener { + /** Session key for transaction status. */ + public static final String TX_STATUS_KEY = "__spring_tx_status_"; + + /** Transaction manager. */ + private PlatformTransactionManager txMgr; + + /** + * Sets transaction manager. + * + * @param txMgr Transaction manager. + */ + public void setTransactionManager(PlatformTransactionManager txMgr) { + A.notNull(txMgr, "txMgr"); + + this.txMgr = txMgr; + } + + /** + * Gets transaction manager. + * + * @return Transaction manager. + */ + public PlatformTransactionManager getTransactionManager() { + return txMgr; + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + if (ses.isWithinTransaction()) { + try { + ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(definition(ses.transaction()))); + } + catch (TransactionException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + if (ses.isWithinTransaction()) { + TransactionStatus tx = ses.<String, TransactionStatus>properties().remove(TX_STATUS_KEY); + + if (tx != null) { + try { + if (commit) + txMgr.commit(tx); + else + txMgr.rollback(tx); + } + catch (TransactionException e) { + throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e); + } + } + } + } + + /** + * Gets DB transaction isolation level based on ongoing cache transaction isolation. + * + * @return DB transaction isolation. + */ + private TransactionDefinition definition(Transaction tx) { + assert tx != null; + + DefaultTransactionDefinition def = new DefaultTransactionDefinition(); + + def.setIsolationLevel(isolationLevel(tx.isolation())); + + return def; + } + + /** + * Gets DB transaction isolation level based on ongoing cache transaction isolation. + * + * @param isolation Cache transaction isolation. + * @return DB transaction isolation. + */ + private int isolationLevel(TransactionIsolation isolation) { + switch (isolation) { + case READ_COMMITTED: + return TransactionDefinition.ISOLATION_READ_COMMITTED; + + case REPEATABLE_READ: + return TransactionDefinition.ISOLATION_REPEATABLE_READ; + + case SERIALIZABLE: + return TransactionDefinition.ISOLATION_SERIALIZABLE; + + default: + throw new IllegalStateException(); // Will never happen. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java new file mode 100644 index 0000000..a7ca317 --- /dev/null +++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.spring; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.springframework.jdbc.core.*; +import org.springframework.jdbc.datasource.*; +import org.springframework.transaction.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import javax.sql.*; +import java.sql.*; +import java.util.*; + +/** + * Tests for {@link CacheStoreSessionJdbcListener}. + */ +public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest { + /** */ + private static final DataSource DATA_SRC = new DriverManagerDataSource("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); + + /** {@inheritDoc} */ + @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() { + return new Factory<CacheStore<Integer, Integer>>() { + @Override public CacheStore<Integer, Integer> create() { + return new Store(new JdbcTemplate(DATA_SRC)); + } + }; + } + + /** {@inheritDoc} */ + @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() { + return new Factory<CacheStoreSessionListener>() { + @Override public CacheStoreSessionListener create() { + CacheStoreSessionSpringListener lsnr = new CacheStoreSessionSpringListener(); + + lsnr.setTransactionManager(new DataSourceTransactionManager(DATA_SRC)); + + return lsnr; + } + }; + } + + /** + */ + private static class Store extends CacheStoreAdapter<Integer, Integer> { + /** */ + private static String SES_CONN_KEY = "ses_conn"; + + /** */ + private final JdbcTemplate jdbc; + + /** */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** + * @param jdbc JDBC template. + */ + private Store(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) { + loadCacheCnt.incrementAndGet(); + + checkTransaction(); + checkConnection(); + } + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + loadCnt.incrementAndGet(); + + checkTransaction(); + checkConnection(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) + throws CacheWriterException { + writeCnt.incrementAndGet(); + + checkTransaction(); + checkConnection(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + deleteCnt.incrementAndGet(); + + checkTransaction(); + checkConnection(); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + assertNull(transaction()); + } + + /** + */ + private void checkTransaction() { + TransactionStatus tx = transaction(); + + if (ses.isWithinTransaction()) { + assertNotNull(tx); + assertFalse(tx.isCompleted()); + } + else + assertNull(tx); + } + + /** + * @return Transaction status. + */ + private TransactionStatus transaction() { + return ses.<String, TransactionStatus>properties().get(CacheStoreSessionSpringListener.TX_STATUS_KEY); + } + + /** + */ + private void checkConnection() { + Connection conn = DataSourceUtils.getConnection(jdbc.getDataSource()); + + assertNotNull(conn); + + try { + assertFalse(conn.isClosed()); + assertEquals(!ses.isWithinTransaction(), conn.getAutoCommit()); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + + verifySameInstance(conn); + } + + /** + * @param conn Connection. + */ + private void verifySameInstance(Connection conn) { + Map<String, Connection> props = ses.properties(); + + Connection sesConn = props.get(SES_CONN_KEY); + + if (sesConn == null) + props.put(SES_CONN_KEY, conn); + else { + assertSame(conn, sesConn); + + reuseCnt.incrementAndGet(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b97441f9/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java index 8251c18..0b7e471 100644 --- a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java +++ b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.*; +import org.apache.ignite.cache.store.spring.*; import org.apache.ignite.internal.*; import org.apache.ignite.p2p.*; import org.apache.ignite.spring.*; @@ -47,6 +48,8 @@ public class IgniteSpringTestSuite extends TestSuite { suite.addTest(new TestSuite(IgniteStartFromStreamConfigurationTest.class)); + suite.addTestSuite(CacheStoreSessionSpringListenerSelfTest.class); + return suite; } }