IGNITE-891 - Cache store improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b37d0046 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b37d0046 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b37d0046 Branch: refs/heads/ignite-sprint-5 Commit: b37d00465f1a162436082660ec69a1f765492373 Parents: 80ebfe0 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu May 21 18:52:15 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu May 21 18:52:15 2015 -0700 ---------------------------------------------------------------------- .../jdbc/CacheStoreSessionJdbcListener.java | 16 +++- .../processors/cache/GridCacheProcessor.java | 10 ++- .../cache/GridCacheSharedContext.java | 15 +++- .../processors/cache/GridCacheUtils.java | 33 +++++++ .../store/GridCacheStoreManagerAdapter.java | 44 +++------- .../cache/transactions/IgniteTxAdapter.java | 13 ++- .../transactions/IgniteTxLocalAdapter.java | 27 ++---- .../loadtests/hashmap/GridCacheTestContext.java | 3 +- .../CacheStoreSessionHibernateListener.java | 82 +++++++++++++++++- modules/spring/pom.xml | 12 +-- .../spring/CacheStoreSessionSpringListener.java | 90 ++++++++++++++++++-- ...CacheStoreSessionSpringListenerSelfTest.java | 2 +- 12 files changed, 271 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java index c683abe..e4cd617 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListener.java @@ -17,9 +17,10 @@ package org.apache.ignite.cache.store.jdbc; +import org.apache.ignite.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.resources.*; +import org.apache.ignite.lifecycle.*; import javax.cache.integration.*; import javax.sql.*; @@ -29,7 +30,7 @@ import java.util.*; /** * Cache store session listener based on JDBC connection. */ -public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener { +public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener, LifecycleAware { /** Session key for JDBC connection. */ public static final String JDBC_CONN_KEY = "__jdbc_conn_"; @@ -57,6 +58,17 @@ public class CacheStoreSessionJdbcListener implements CacheStoreSessionListener } /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (dataSrc == null) + throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.'); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ @Override public void onSessionStart(CacheStoreSession ses) { Map<String, Connection> props = ses.properties(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 0e1a9c2..5b57817 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -567,7 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); - sharedCtx = createSharedContext(ctx); + sharedCtx = createSharedContext(ctx, CU.createStoreSessionListeners(ctx, + ctx.config().getCacheStoreSessionListenerFactories())); ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)", !ctx.config().getTransactionConfiguration().isTxSerializableEnabled()); @@ -1562,10 +1563,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Creates shared context. * * @param kernalCtx Kernal context. + * @param storeSesLsnrs Store session listeners. * @return Shared context. */ @SuppressWarnings("unchecked") - private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx) { + private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx, + Collection<CacheStoreSessionListener> storeSesLsnrs) { IgniteTxManager tm = new IgniteTxManager(); GridCacheMvccManager mvccMgr = new GridCacheMvccManager(); GridCacheVersionManager verMgr = new GridCacheVersionManager(); @@ -1580,7 +1583,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { mvccMgr, depMgr, exchMgr, - ioMgr + ioMgr, + storeSesLsnrs ); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index b16885e..45634b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -77,6 +78,9 @@ public class GridCacheSharedContext<K, V> { /** Preloaders start future. */ private IgniteInternalFuture<Object> preloadersStartFut; + /** Store session listeners. */ + private Collection<CacheStoreSessionListener> storeSesLsnrs; + /** * @param txMgr Transaction manager. * @param verMgr Version manager. @@ -89,7 +93,8 @@ public class GridCacheSharedContext<K, V> { GridCacheMvccManager mvccMgr, GridCacheDeploymentManager<K, V> depMgr, GridCachePartitionExchangeManager<K, V> exchMgr, - GridCacheIoManager ioMgr + GridCacheIoManager ioMgr, + Collection<CacheStoreSessionListener> storeSesLsnrs ) { this.kernalCtx = kernalCtx; this.mvccMgr = add(mvccMgr); @@ -98,6 +103,7 @@ public class GridCacheSharedContext<K, V> { this.depMgr = add(depMgr); this.exchMgr = add(exchMgr); this.ioMgr = add(ioMgr); + this.storeSesLsnrs = storeSesLsnrs; txMetrics = new TransactionMetricsAdapter(); @@ -524,6 +530,13 @@ public class GridCacheSharedContext<K, V> { } /** + * @return Store session listeners. + */ + @Nullable public Collection<CacheStoreSessionListener> storeSessionListeners() { + return storeSesLsnrs; + } + + /** * @param mgr Manager to add. * @return Added manager. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 549f42f..6968fcb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -34,12 +35,14 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; import org.apache.ignite.plugin.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import org.jsr166.*; import javax.cache.*; +import javax.cache.configuration.*; import javax.cache.expiry.*; import javax.cache.integration.*; import java.io.*; @@ -1790,4 +1793,34 @@ public class GridCacheUtils { return res; } + + /** + * Creates store session listeners. + * + * @param ctx Kernal context. + * @param factories Factories. + * @return Listeners. + */ + public static Collection<CacheStoreSessionListener> createStoreSessionListeners(GridKernalContext ctx, + Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException { + if (factories == null) + return null; + + Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length); + + for (Factory<CacheStoreSessionListener> factory : factories) { + CacheStoreSessionListener lsnr = factory.create(); + + if (lsnr != null) { + ctx.resource().injectGeneric(lsnr); + + if (lsnr instanceof LifecycleAware) + ((LifecycleAware)lsnr).start(); + + lsnrs.add(lsnr); + } + } + + return lsnrs; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 79ac86d..8096291 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -35,7 +35,6 @@ import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import javax.cache.*; -import javax.cache.configuration.*; import javax.cache.integration.*; import java.util.*; @@ -167,39 +166,10 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt "Persistence store is configured, but both read-through and write-through are disabled."); } - sesLsnrs = createSessionListeners(cfg.getCacheStoreSessionListenerFactories()); + sesLsnrs = CU.createStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories()); if (sesLsnrs == null) - sesLsnrs = createSessionListeners(cctx.kernalContext().config().getCacheStoreSessionListenerFactories()); - } - - /** - * Creates session listeners. - * - * @param factories Factories. - * @return Listeners. - */ - private Collection<CacheStoreSessionListener> createSessionListeners(Factory<CacheStoreSessionListener>[] factories) - throws IgniteCheckedException { - if (factories == null) - return null; - - Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length); - - for (Factory<CacheStoreSessionListener> factory : factories) { - CacheStoreSessionListener lsnr = factory.create(); - - if (lsnr != null) { - cctx.kernalContext().resource().injectGeneric(lsnr); - - if (lsnr instanceof LifecycleAware) - ((LifecycleAware)lsnr).start(); - - lsnrs.add(lsnr); - } - } - - return lsnrs; + sesLsnrs = cctx.shared().storeSessionListeners(); } /** {@inheritDoc} */ @@ -754,6 +724,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt if (!sesHolder.get().storeEnded(store)) store.sessionEnd(commit); } + catch (Throwable e) { + last = true; + + throw e; + } finally { if (last && sesHolder != null) { sesHolder.set(null); @@ -834,8 +809,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt lsnr.onSessionEnd(locSes, !threwEx); } - if (!sesHolder.get().storeEnded(store)) - store.sessionEnd(!threwEx); + assert !sesHolder.get().storeEnded(store); + + store.sessionEnd(!threwEx); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index f6d5d90..adc1c86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -408,9 +408,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter if (!storeEnabled()) return false; - Collection<CacheStoreManager> stores = stores(); + Collection<Integer> cacheIds = activeCacheIds(); + + if (!cacheIds.isEmpty()) { + for (int cacheId : cacheIds) { + CacheStoreManager store = cctx.cacheContext(cacheId).store(); - return stores != null && !stores.isEmpty(); + if (store.configured()) + return true; + } + } + + return false; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 1bed2da..fdaef47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -18,10 +18,10 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; -import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; @@ -1010,24 +1010,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().resetContext(); } } - else if (!internal()) { - Collection<CacheStoreManager> stores = stores(); - - if (stores != null && !stores.isEmpty()) { - try { - sessionEnd(stores, true); - } - catch (IgniteCheckedException e) { - commitError(e); - - setRollbackOnly(); - - cctx.tm().removeCommittedTx(this); - - throw e; - } - } - } // Do not unlock transaction entries if one-phase commit. if (!onePhaseCommit()) { @@ -1119,7 +1101,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (!internal()) { Collection<CacheStoreManager> stores = stores(); - if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht())) + assert isWriteToStoreFromDhtValid(stores) : + "isWriteToStoreFromDht can't be different within one transaction"; + + boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht(); + + if (stores != null && !stores.isEmpty() && (near() || isWriteToStoreFromDht)) sessionEnd(stores, false); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 0b0f099..1c85ed3 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -54,7 +54,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new GridCacheMvccManager(), new GridCacheDeploymentManager<K, V>(), new GridCachePartitionExchangeManager<K, V>(), - new GridCacheIoManager() + new GridCacheIoManager(), + null ), defaultCacheConfiguration(), CacheType.USER, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java index fc9eb91..fe0960e 100644 --- a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java +++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListener.java @@ -17,23 +17,40 @@ package org.apache.ignite.cache.store.hibernate; +import org.apache.ignite.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; import org.hibernate.*; +import org.hibernate.cfg.*; import javax.cache.integration.*; +import java.io.*; +import java.net.*; import java.util.*; /** * Cache store session listener based on Hibernate session. */ -public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener { +public class CacheStoreSessionHibernateListener implements CacheStoreSessionListener, LifecycleAware { /** Session key for JDBC connection. */ public static final String HIBERNATE_SES_KEY = "__hibernate_ses_"; /** Hibernate session factory. */ private SessionFactory sesFactory; + /** Hibernate configuration file path. */ + private String hibernateCfgPath; + + /** Logger. */ + @LoggerResource + private IgniteLogger log; + + /** Whether to close session on stop. */ + private boolean closeSesOnStop; + /** * Sets Hibernate session factory. * @@ -54,6 +71,69 @@ public class CacheStoreSessionHibernateListener implements CacheStoreSessionList return sesFactory; } + /** + * Sets hibernate configuration path. + * + * @param hibernateCfgPath Hibernate configuration path. + */ + public void setHibernateConfigurationPath(String hibernateCfgPath) { + this.hibernateCfgPath = hibernateCfgPath; + } + + /** + * Gets hibernate configuration path. + * + * @return Hibernate configuration path. + */ + public String getHibernateConfigurationPath() { + return hibernateCfgPath; + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public void start() throws IgniteException { + if (sesFactory == null && F.isEmpty(hibernateCfgPath)) + throw new IgniteException("Either session factory or Hibernate configuration file is required by " + + getClass().getSimpleName() + '.'); + + if (!F.isEmpty(hibernateCfgPath)) { + if (sesFactory == null) { + try { + URL url = new URL(hibernateCfgPath); + + sesFactory = new Configuration().configure(url).buildSessionFactory(); + } + catch (MalformedURLException ignored) { + // No-op. + } + + if (sesFactory == null) { + File cfgFile = new File(hibernateCfgPath); + + if (cfgFile.exists()) + sesFactory = new Configuration().configure(cfgFile).buildSessionFactory(); + } + + if (sesFactory == null) + sesFactory = new Configuration().configure(hibernateCfgPath).buildSessionFactory(); + + if (sesFactory == null) + throw new IgniteException("Failed to resolve Hibernate configuration file: " + hibernateCfgPath); + + closeSesOnStop = true; + } + else + U.warn(log, "Hibernate configuration file configured in " + getClass().getSimpleName() + + " will be ignored (session factory is already set)."); + } + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (closeSesOnStop && sesFactory != null && !sesFactory.isClosed()) + sesFactory.close(); + } + /** {@inheritDoc} */ @Override public void onSessionStart(CacheStoreSession ses) { Map<String, Session> props = ses.properties(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/spring/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spring/pom.xml b/modules/spring/pom.xml index 2633c83..f49a23d 100644 --- a/modules/spring/pom.xml +++ b/modules/spring/pom.xml @@ -77,6 +77,12 @@ </dependency> <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-jdbc</artifactId> + <version>${spring.version}</version> + </dependency> + + <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> @@ -104,12 +110,6 @@ <scope>test</scope> </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-jdbc</artifactId> - <version>${spring.version}</version> - <scope>test</scope> - </dependency> <dependency> <groupId>com.h2database</groupId> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java index e0caad5..2fab4f0 100644 --- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java +++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java @@ -17,32 +17,45 @@ package org.apache.ignite.cache.store.spring; +import org.apache.ignite.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; +import org.springframework.jdbc.datasource.*; import org.springframework.transaction.*; import org.springframework.transaction.support.*; import javax.cache.integration.*; +import javax.sql.*; /** * Cache store session listener based on Spring cache manager. */ -public class CacheStoreSessionSpringListener implements CacheStoreSessionListener { +public class CacheStoreSessionSpringListener implements CacheStoreSessionListener, LifecycleAware { /** Session key for transaction status. */ public static final String TX_STATUS_KEY = "__spring_tx_status_"; /** Transaction manager. */ private PlatformTransactionManager txMgr; + /** Data source. */ + private DataSource dataSrc; + + /** Propagation behavior. */ + private int propagation = TransactionDefinition.PROPAGATION_REQUIRED; + + /** Logger. */ + @LoggerResource + private IgniteLogger log; + /** * Sets transaction manager. * * @param txMgr Transaction manager. */ public void setTransactionManager(PlatformTransactionManager txMgr) { - A.notNull(txMgr, "txMgr"); - this.txMgr = txMgr; } @@ -55,11 +68,71 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene return txMgr; } + /** + * Sets data source. + * + * @param dataSrc Data source. + */ + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * Gets data source. + * + * @return Data source. + */ + public DataSource getDataSource() { + return dataSrc; + } + + /** + * Sets propagation behavior. + * + * @param propagation Propagation behavior. + */ + public void setPropagationBehavior(int propagation) { + this.propagation = propagation; + } + + /** + * Gets propagation behavior. + * + * @return Propagation behavior. + */ + public int getPropagationBehavior() { + return propagation; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (txMgr == null && dataSrc == null) + throw new IgniteException("Either transaction manager or data source is required by " + + getClass().getSimpleName() + '.'); + + if (dataSrc != null) { + if (txMgr == null) + txMgr = new DataSourceTransactionManager(dataSrc); + else + U.warn(log, "Data source configured in " + getClass().getSimpleName() + + " will be ignored (transaction manager is already set)."); + } + + assert txMgr != null; + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + // No-op. + } + /** {@inheritDoc} */ @Override public void onSessionStart(CacheStoreSession ses) { if (ses.isWithinTransaction()) { try { - ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(definition(ses.transaction()))); + TransactionDefinition def = definition(ses.transaction(), ses.cacheName()); + + ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(def)); } catch (TransactionException e) { throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); @@ -91,12 +164,19 @@ public class CacheStoreSessionSpringListener implements CacheStoreSessionListene * * @return DB transaction isolation. */ - private TransactionDefinition definition(Transaction tx) { + private TransactionDefinition definition(Transaction tx, String cacheName) { assert tx != null; DefaultTransactionDefinition def = new DefaultTransactionDefinition(); + def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>") + ", id=" + tx.xid() + ']'); def.setIsolationLevel(isolationLevel(tx.isolation())); + def.setPropagationBehavior(propagation); + + long timeoutSec = (tx.timeout() + 500) / 1000; + + if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE) + def.setTimeout((int)timeoutSec); return def; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b37d0046/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java index 79d5b5e..83ed249 100644 --- a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java @@ -54,7 +54,7 @@ public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionLi @Override public CacheStoreSessionListener create() { CacheStoreSessionSpringListener lsnr = new CacheStoreSessionSpringListener(); - lsnr.setTransactionManager(new DataSourceTransactionManager(DATA_SRC)); + lsnr.setDataSource(DATA_SRC); return lsnr; }