IGNITE-891 - Cache store improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e47f85ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e47f85ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e47f85ac Branch: refs/heads/ignite-sprint-5 Commit: e47f85acae51f609e0bd1a1465c38d4a1bc96576 Parents: 4a55d29 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Fri May 22 00:02:07 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Fri May 22 00:02:07 2015 -0700 ---------------------------------------------------------------------- .../apache/ignite/cache/store/CacheStore.java | 2 + .../cache/store/CacheStoreSessionListener.java | 99 +++++++++++++++++++- .../jdbc/CacheStoreSessionJdbcListener.java | 44 ++++++++- .../configuration/CacheConfiguration.java | 6 ++ .../configuration/IgniteConfiguration.java | 7 ++ .../CacheStoreSessionHibernateListener.java | 48 +++++++++- .../spring/CacheStoreSessionSpringListener.java | 32 ++++++- 7 files changed, 230 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java index d018298..5bfdda1 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java @@ -94,6 +94,8 @@ public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> { * @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases * may bring cache transaction into {@link TransactionState#UNKNOWN} which will * consequently cause all transacted entries to be invalidated. + * @deprecated Use {@link CacheStoreSessionListener} instead (refer to its JavaDoc for details). */ + @Deprecated public void sessionEnd(boolean commit) throws CacheWriterException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java index cba66c3..8b7cd8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java @@ -17,12 +17,104 @@ package org.apache.ignite.cache.store; +import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.configuration.*; + +import javax.cache.configuration.*; +import javax.sql.*; + /** - * Store session listener. + * Cache store session listener that allows to implement callbacks + * for session lifecycle. + * <p> + * The most common use case for session listeners is database + * connection and transaction management. Store can be invoked one + * or several times during one session, depending on whether it's + * executed within cache transaction or not. In any case, you have + * to create a connection when session is started and commit it or + * rollback when session is finished. + * <p> + * Cache store session listener allows to implement this and other + * scenarios providing to callback methods: + * <ul> + * <li> + * {@link #onSessionStart(CacheStoreSession)} - called + * before any store operation within a session is invoked. + * </li> + * <li> + * {@link #onSessionEnd(CacheStoreSession, boolean)} - called + * after all operations within a session are invoked. + * </li> + * </ul> + * <h2>Implementations</h2> + * Ignites provides several out-of-the-box implementations + * of session listener (refer to individual JavaDocs for more + * details): + * <ul> + * <li> + * {@link CacheStoreSessionJdbcListener} - JDBC-based session + * listener. For each session it gets a new JDBC connection from + * provided {@link DataSource} and commits (or rolls back) it + * when session ends. + * </li> + * <li> + * {@ignitelink org.apache.ignite.cache.store.spring.CacheStoreSessionSpringListener} - + * session listener based on Spring transaction management. + * It starts a new DB transaction for each session and commits + * (or rolls back) it when session ends. If there is no ongoing + * cache transaction, this listener is no-op. + * </li> + * <li> + * <@ignitelink org.apache.ignite.cache.store.hibernate.CacheStoreSessionHibernateListener} - + * Hibernate-based session listener. It creates a new Hibernate + * session for each Ignite session. If there is an ongoing cache + * transaction, a corresponding Hibernate transaction is created + * as well. + * </li> + * </ul> + * <h2>Configuration</h2> + * There are two ways to configure a session listener: + * <ul> + * <li> + * Provide a global listener for all caches via + * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. This will we called for any store + * session, not depending on what caches participate in + * transaction. + * </li> + * <li> + * Provide a listener for a particular cache via + * {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. This will be called only if the + * cache participates in transaction. + * </li> + * </ul> + * For example, here is how global {@link CacheStoreSessionJdbcListener} + * can be configured in Spring XML configuration file: + * <pre name="code" class="xml"> + * <bean class="org.apache.ignite.configuration.IgniteConfiguration"> + * ... + * + * <property name="CacheStoreSessionListenerFactories"> + * <list> + * <bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory"> + * <constructor-arg> + * <bean class="org.apache.ignite.cache.store.jdbc.CacheStoreSessionJdbcListener"> + * <!-- Inject external data source. --> + * <property name="dataSource" ref="jdbc-data-source"/> + * </bean> + * </constructor-arg> + * </bean> + * </list> + * </property> + * </bean> + * </pre> */ public interface CacheStoreSessionListener { /** * On session start callback. + * <p> + * Called before any store operation within a session is invoked. * * @param ses Current session. */ @@ -30,9 +122,12 @@ public interface CacheStoreSessionListener { /** * On session end callback. + * <p> + * Called after all operations within a session are invoked. * * @param ses Current session. - * @param commit {@code True} if transaction should commit, {@code false} for rollback. + * @param commit {@code True} if persistence store transaction + * should commit, {@code false} for rollback. */ public void onSessionEnd(CacheStoreSession ses, boolean commit); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java index e4cd617..7920fae 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.store.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lifecycle.*; +import javax.cache.*; import javax.cache.integration.*; import javax.sql.*; import java.sql.*; @@ -29,6 +30,44 @@ import java.util.*; /** * Cache store session listener based on JDBC connection. + * <p> + * For each session this listener gets a new JDBC connection + * from provided {@link DataSource} and commits (or rolls + * back) it when session ends. + * <p> + * The connection is stored in store session + * {@link CacheStoreSession#properties() properties} and can + * be accessed at any moment by {@link #JDBC_CONN_KEY} key. + * The listener guarantees that the connection will be + * available for any store operation. If there is an + * ongoing cache transaction, all operations within this + * transaction will be committed or rolled back only when + * session ends. + * <p> + * As an example, here is how the {@link CacheStore#write(Cache.Entry)} + * method can be implemented if {@link CacheStoreSessionJdbcListener} + * is configured: + * <pre name="code" class="java"> + * private static class Store extends CacheStoreAdapter<Integer, Integer> { + * @CacheStoreSessionResource + * private CacheStoreSession ses; + * + * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException { + * // Get connection from the current session. + * Connection conn = ses.<String, Connection>properties().get(CacheStoreSessionJdbcListener.JDBC_CONN_KEY); + * + * // Execute update SQL query. + * try { + * conn.createStatement().executeUpdate("..."); + * } + * catch (SQLException e) { + * throw new CacheWriterException("Failed to update the store.", e); + * } + * } + * } + * </pre> + * JDBC connection will be automatically created by the listener + * at the start of the session and closed when it ends. */ public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener, LifecycleAware { /** Session key for JDBC connection. */ @@ -39,12 +78,13 @@ public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener, /** * Sets data source. + * <p> + * This is a required parameter. If data source is not set, + * exception will be thrown on startup. * * @param dataSrc Data source. */ public void setDataSource(DataSource dataSrc) { - A.notNull(dataSrc, "dataSrc"); - this.dataSrc = dataSrc; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 33a5711c..0018218 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -1742,6 +1742,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { * Gets cache store session listener factories. * * @return Cache store session listener factories. + * @see CacheStoreSessionListener */ public Factory<? extends CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() { return storeSesLsnrs; @@ -1749,9 +1750,14 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** * Cache store session listener factories. + * <p> + * These listeners override global listeners provided in + * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. * * @param storeSesLsnrs Cache store session listener factories. * @return {@code this} for chaining. + * @see CacheStoreSessionListener */ public CacheConfiguration setCacheStoreSessionListenerFactories( Factory<? extends CacheStoreSessionListener>... storeSesLsnrs) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 96ac7e3..5d4c98f 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -2249,6 +2249,7 @@ public class IgniteConfiguration { * Gets cache store session listener factories. * * @return Cache store session listener factories. + * @see CacheStoreSessionListener */ public Factory<CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() { return storeSesLsnrs; @@ -2256,9 +2257,15 @@ public class IgniteConfiguration { /** * Cache store session listener factories. + * <p> + * These are global store session listeners, so they are applied to + * all caches. If you need to override listeners for a + * particular cache, use {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. * * @param storeSesLsnrs Cache store session listener factories. * @return {@code this} for chaining. + * @see CacheStoreSessionListener */ public IgniteConfiguration setCacheStoreSessionListenerFactories( Factory<CacheStoreSessionListener>... storeSesLsnrs) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java index fe0960e..ea1214a 100644 --- a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java +++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java @@ -32,7 +32,45 @@ import java.net.*; import java.util.*; /** - * Cache store session listener based on Hibernate session. + * Hibernate-based cache store session listener. + * <p> + * This listener creates a new Hibernate session for each store + * session. If there is an ongoing cache transaction, a corresponding + * Hibernate transaction is created as well. + * <p> + * The Hibernate session is stored in store session + * {@link CacheStoreSession#properties() properties} and can + * be accessed at any moment by {@link #HIBERNATE_SES_KEY} key. + * The listener guarantees that the session will be + * available for any store operation. If there is an + * ongoing cache transaction, all operations within this + * transaction will share a DB transaction. + * <p> + * As an example, here is how the {@link CacheStore#write(javax.cache.Cache.Entry)} + * method can be implemented if {@link CacheStoreSessionHibernateListener} + * is configured: + * <pre name="code" class="java"> + * private static class Store extends CacheStoreAdapter<Integer, Integer> { + * @CacheStoreSessionResource + * private CacheStoreSession ses; + * + * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException { + * // Get Hibernate session from the current store session. + * Session hibSes = ses.<String, Session>properties().get(CacheStoreSessionHibernateListener.HIBERNATE_SES_KEY); + * + * // Persist the value. + * hibSes.persist(entry.getValue()); + * } + * } + * </pre> + * Hibernate session will be automatically created by the listener + * at the start of the session and closed when it ends. + * <p> + * {@link CacheStoreSessionHibernateListener} requires that either + * {@link #setSessionFactory(SessionFactory)} session factory} + * or {@link #setHibernateConfigurationPath(String) Hibernate configuration file} + * is provided. If non of them is set, exception is thrown. Is both are provided, + * session factory will be used. */ public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener, LifecycleAware { /** Session key for JDBC connection. */ @@ -53,12 +91,13 @@ public class CacheStoreSessionHibernateListener implements CacheStoreSessionList /** * Sets Hibernate session factory. + * <p> + * Either session factory or configuration file is required. + * If none is provided, exception will be thrown on startup. * * @param sesFactory Session factory. */ public void setSessionFactory(SessionFactory sesFactory) { - A.notNull(sesFactory, "sesFactory"); - this.sesFactory = sesFactory; } @@ -73,6 +112,9 @@ public class CacheStoreSessionHibernateListener implements CacheStoreSessionList /** * Sets hibernate configuration path. + * <p> + * Either session factory or configuration file is required. + * If none is provided, exception will be thrown on startup. * * @param hibernateCfgPath Hibernate configuration path. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e47f85ac/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java index 2fab4f0..e5201ba 100644 --- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java +++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lifecycle.*; import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; +import org.springframework.jdbc.core.*; import org.springframework.jdbc.datasource.*; import org.springframework.transaction.*; import org.springframework.transaction.support.*; @@ -31,7 +32,28 @@ import javax.cache.integration.*; import javax.sql.*; /** - * Cache store session listener based on Spring cache manager. + * Cache store session listener based on Spring transaction management. + * <p> + * This listener starts a new DB transaction for each session and commits + * or rolls it back when session ends. If there is no ongoing + * cache transaction, this listener is no-op. + * <p> + * Store implementation can use any Spring APIs like {@link JdbcTemplate} + * and others. The listener will guarantee that if there is an + * ongoing cache transaction, all store operations within this + * transaction will be automatically enlisted in the same database + * transaction. + * <p> + * {@link CacheStoreSessionSpringListener} requires that either + * {@link #setTransactionManager(PlatformTransactionManager) transaction manager} + * or {@link #setDataSource(DataSource) data source} is configured. If non of them is + * provided, exception is thrown. Is both are provided, data source will be + * ignored. + * <p> + * If there is a transaction, a {@link TransactionStatus} object will be stored + * in store session {@link CacheStoreSession#properties() properties} and can be + * accessed at any moment by {@link #TX_STATUS_KEY} key. This can be used to + * acquire current DB transaction status. */ public class CacheStoreSessionSpringListener implements CacheStoreSessionListener, LifecycleAware { /** Session key for transaction status. */ @@ -52,6 +74,9 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene /** * Sets transaction manager. + * <p> + * Either transaction manager or data source is required. + * If none is provided, exception will be thrown on startup. * * @param txMgr Transaction manager. */ @@ -70,6 +95,9 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene /** * Sets data source. + * <p> + * Either transaction manager or data source is required. + * If none is provided, exception will be thrown on startup. * * @param dataSrc Data source. */ @@ -88,6 +116,8 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene /** * Sets propagation behavior. + * <p> + * This parameter is optional. * * @param propagation Propagation behavior. */