http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java new file mode 100644 index 0000000..81736cd --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.spring; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.springframework.jdbc.core.*; +import org.springframework.jdbc.datasource.*; +import org.springframework.transaction.*; +import org.springframework.transaction.support.*; + +import javax.cache.integration.*; +import javax.sql.*; + +/** + * Cache store session listener based on Spring transaction management. + * <p> + * This listener starts a new DB transaction for each session and commits + * or rolls it back when session ends. If there is no ongoing + * cache transaction, this listener is no-op. + * <p> + * Store implementation can use any Spring APIs like {@link JdbcTemplate} + * and others. The listener will guarantee that if there is an + * ongoing cache transaction, all store operations within this + * transaction will be automatically enlisted in the same database + * transaction. + * <p> + * {@link CacheSpringStoreSessionListener} requires that either + * {@link #setTransactionManager(PlatformTransactionManager) transaction manager} + * or {@link #setDataSource(DataSource) data source} is configured. If non of them is + * provided, exception is thrown. Is both are provided, data source will be + * ignored. + * <p> + * If there is a transaction, a {@link TransactionStatus} object will be saved + * as a store session {@link CacheStoreSession#attachment() attachment}. It + * can be used to acquire current DB transaction status. + */ +public class CacheSpringStoreSessionListener implements CacheStoreSessionListener, LifecycleAware { + /** Transaction manager. */ + private PlatformTransactionManager txMgr; + + /** Data source. */ + private DataSource dataSrc; + + /** Propagation behavior. */ + private int propagation = TransactionDefinition.PROPAGATION_REQUIRED; + + /** Logger. */ + @LoggerResource + private IgniteLogger log; + + /** + * Sets transaction manager. + * <p> + * Either transaction manager or data source is required. + * If none is provided, exception will be thrown on startup. + * + * @param txMgr Transaction manager. + */ + public void setTransactionManager(PlatformTransactionManager txMgr) { + this.txMgr = txMgr; + } + + /** + * Gets transaction manager. + * + * @return Transaction manager. + */ + public PlatformTransactionManager getTransactionManager() { + return txMgr; + } + + /** + * Sets data source. + * <p> + * Either transaction manager or data source is required. + * If none is provided, exception will be thrown on startup. + * + * @param dataSrc Data source. + */ + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * Gets data source. + * + * @return Data source. + */ + public DataSource getDataSource() { + return dataSrc; + } + + /** + * Sets propagation behavior. + * <p> + * This parameter is optional. + * + * @param propagation Propagation behavior. + */ + public void setPropagationBehavior(int propagation) { + this.propagation = propagation; + } + + /** + * Gets propagation behavior. + * + * @return Propagation behavior. + */ + public int getPropagationBehavior() { + return propagation; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (txMgr == null && dataSrc == null) + throw new IgniteException("Either transaction manager or data source is required by " + + getClass().getSimpleName() + '.'); + + if (dataSrc != null) { + if (txMgr == null) + txMgr = new DataSourceTransactionManager(dataSrc); + else + U.warn(log, "Data source configured in " + getClass().getSimpleName() + + " will be ignored (transaction manager is already set)."); + } + + assert txMgr != null; + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + if (ses.isWithinTransaction()) { + try { + TransactionDefinition def = definition(ses.transaction(), ses.cacheName()); + + ses.attach(txMgr.getTransaction(def)); + } + catch (TransactionException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + if (ses.isWithinTransaction()) { + TransactionStatus tx = ses.attachment(); + + if (tx != null) { + ses.attach(null); + + try { + if (commit) + txMgr.commit(tx); + else + txMgr.rollback(tx); + } + catch (TransactionException e) { + throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e); + } + } + } + } + + /** + * Gets DB transaction isolation level based on ongoing cache transaction isolation. + * + * @return DB transaction isolation. + */ + private TransactionDefinition definition(Transaction tx, String cacheName) { + assert tx != null; + + DefaultTransactionDefinition def = new DefaultTransactionDefinition(); + + def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>") + ", id=" + tx.xid() + ']'); + def.setIsolationLevel(isolationLevel(tx.isolation())); + def.setPropagationBehavior(propagation); + + long timeoutSec = (tx.timeout() + 500) / 1000; + + if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE) + def.setTimeout((int)timeoutSec); + + return def; + } + + /** + * Gets DB transaction isolation level based on ongoing cache transaction isolation. + * + * @param isolation Cache transaction isolation. + * @return DB transaction isolation. + */ + private int isolationLevel(TransactionIsolation isolation) { + switch (isolation) { + case READ_COMMITTED: + return TransactionDefinition.ISOLATION_READ_COMMITTED; + + case REPEATABLE_READ: + return TransactionDefinition.ISOLATION_REPEATABLE_READ; + + case SERIALIZABLE: + return TransactionDefinition.ISOLATION_SERIALIZABLE; + + default: + throw new IllegalStateException(); // Will never happen. + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java deleted file mode 100644 index e5201ba..0000000 --- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.spring; - -import org.apache.ignite.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lifecycle.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.transactions.*; -import org.springframework.jdbc.core.*; -import org.springframework.jdbc.datasource.*; -import org.springframework.transaction.*; -import org.springframework.transaction.support.*; - -import javax.cache.integration.*; -import javax.sql.*; - -/** - * Cache store session listener based on Spring transaction management. - * <p> - * This listener starts a new DB transaction for each session and commits - * or rolls it back when session ends. If there is no ongoing - * cache transaction, this listener is no-op. - * <p> - * Store implementation can use any Spring APIs like {@link JdbcTemplate} - * and others. The listener will guarantee that if there is an - * ongoing cache transaction, all store operations within this - * transaction will be automatically enlisted in the same database - * transaction. - * <p> - * {@link CacheStoreSessionSpringListener} requires that either - * {@link #setTransactionManager(PlatformTransactionManager) transaction manager} - * or {@link #setDataSource(DataSource) data source} is configured. If non of them is - * provided, exception is thrown. Is both are provided, data source will be - * ignored. - * <p> - * If there is a transaction, a {@link TransactionStatus} object will be stored - * in store session {@link CacheStoreSession#properties() properties} and can be - * accessed at any moment by {@link #TX_STATUS_KEY} key. This can be used to - * acquire current DB transaction status. - */ -public class CacheStoreSessionSpringListener implements CacheStoreSessionListener, LifecycleAware { - /** Session key for transaction status. */ - public static final String TX_STATUS_KEY = "__spring_tx_status_"; - - /** Transaction manager. */ - private PlatformTransactionManager txMgr; - - /** Data source. */ - private DataSource dataSrc; - - /** Propagation behavior. */ - private int propagation = TransactionDefinition.PROPAGATION_REQUIRED; - - /** Logger. */ - @LoggerResource - private IgniteLogger log; - - /** - * Sets transaction manager. - * <p> - * Either transaction manager or data source is required. - * If none is provided, exception will be thrown on startup. - * - * @param txMgr Transaction manager. - */ - public void setTransactionManager(PlatformTransactionManager txMgr) { - this.txMgr = txMgr; - } - - /** - * Gets transaction manager. - * - * @return Transaction manager. - */ - public PlatformTransactionManager getTransactionManager() { - return txMgr; - } - - /** - * Sets data source. - * <p> - * Either transaction manager or data source is required. - * If none is provided, exception will be thrown on startup. - * - * @param dataSrc Data source. - */ - public void setDataSource(DataSource dataSrc) { - this.dataSrc = dataSrc; - } - - /** - * Gets data source. - * - * @return Data source. - */ - public DataSource getDataSource() { - return dataSrc; - } - - /** - * Sets propagation behavior. - * <p> - * This parameter is optional. - * - * @param propagation Propagation behavior. - */ - public void setPropagationBehavior(int propagation) { - this.propagation = propagation; - } - - /** - * Gets propagation behavior. - * - * @return Propagation behavior. - */ - public int getPropagationBehavior() { - return propagation; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - if (txMgr == null && dataSrc == null) - throw new IgniteException("Either transaction manager or data source is required by " + - getClass().getSimpleName() + '.'); - - if (dataSrc != null) { - if (txMgr == null) - txMgr = new DataSourceTransactionManager(dataSrc); - else - U.warn(log, "Data source configured in " + getClass().getSimpleName() + - " will be ignored (transaction manager is already set)."); - } - - assert txMgr != null; - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onSessionStart(CacheStoreSession ses) { - if (ses.isWithinTransaction()) { - try { - TransactionDefinition def = definition(ses.transaction(), ses.cacheName()); - - ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(def)); - } - catch (TransactionException e) { - throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); - } - } - } - - /** {@inheritDoc} */ - @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { - if (ses.isWithinTransaction()) { - TransactionStatus tx = ses.<String, TransactionStatus>properties().remove(TX_STATUS_KEY); - - if (tx != null) { - try { - if (commit) - txMgr.commit(tx); - else - txMgr.rollback(tx); - } - catch (TransactionException e) { - throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e); - } - } - } - } - - /** - * Gets DB transaction isolation level based on ongoing cache transaction isolation. - * - * @return DB transaction isolation. - */ - private TransactionDefinition definition(Transaction tx, String cacheName) { - assert tx != null; - - DefaultTransactionDefinition def = new DefaultTransactionDefinition(); - - def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>") + ", id=" + tx.xid() + ']'); - def.setIsolationLevel(isolationLevel(tx.isolation())); - def.setPropagationBehavior(propagation); - - long timeoutSec = (tx.timeout() + 500) / 1000; - - if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE) - def.setTimeout((int)timeoutSec); - - return def; - } - - /** - * Gets DB transaction isolation level based on ongoing cache transaction isolation. - * - * @param isolation Cache transaction isolation. - * @return DB transaction isolation. - */ - private int isolationLevel(TransactionIsolation isolation) { - switch (isolation) { - case READ_COMMITTED: - return TransactionDefinition.ISOLATION_READ_COMMITTED; - - case REPEATABLE_READ: - return TransactionDefinition.ISOLATION_REPEATABLE_READ; - - case SERIALIZABLE: - return TransactionDefinition.ISOLATION_SERIALIZABLE; - - default: - throw new IllegalStateException(); // Will never happen. - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java new file mode 100644 index 0000000..74f5c69 --- /dev/null +++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.spring; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.springframework.jdbc.core.*; +import org.springframework.jdbc.datasource.*; +import org.springframework.transaction.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import javax.sql.*; +import java.sql.*; +import java.util.*; + +/** + * Tests for {@link CacheJdbcStoreSessionListener}. + */ +public class CacheSpringStoreSessionListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest { + /** */ + private static final DataSource DATA_SRC = new DriverManagerDataSource(URL); + + /** {@inheritDoc} */ + @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() { + return new Factory<CacheStore<Integer, Integer>>() { + @Override public CacheStore<Integer, Integer> create() { + return new Store(new JdbcTemplate(DATA_SRC)); + } + }; + } + + /** {@inheritDoc} */ + @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() { + return new Factory<CacheStoreSessionListener>() { + @Override public CacheStoreSessionListener create() { + CacheSpringStoreSessionListener lsnr = new CacheSpringStoreSessionListener(); + + lsnr.setDataSource(DATA_SRC); + + return lsnr; + } + }; + } + + /** + */ + private static class Store extends CacheStoreAdapter<Integer, Integer> { + /** */ + private static String SES_CONN_KEY = "ses_conn"; + + /** */ + private final JdbcTemplate jdbc; + + /** */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** + * @param jdbc JDBC template. + */ + private Store(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) { + loadCacheCnt.incrementAndGet(); + + checkTransaction(); + checkConnection(); + } + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + loadCnt.incrementAndGet(); + + checkTransaction(); + checkConnection(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) + throws CacheWriterException { + writeCnt.incrementAndGet(); + + checkTransaction(); + checkConnection(); + + if (write.get()) { + String table; + + switch (ses.cacheName()) { + case "cache1": + table = "Table1"; + + break; + + case "cache2": + if (fail.get()) + throw new CacheWriterException("Expected failure."); + + table = "Table2"; + + break; + + default: + throw new CacheWriterException("Wring cache: " + ses.cacheName()); + } + + jdbc.update("INSERT INTO " + table + " (key, value) VALUES (?, ?)", + entry.getKey(), entry.getValue()); + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + deleteCnt.incrementAndGet(); + + checkTransaction(); + checkConnection(); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + assertNull(ses.attachment()); + } + + /** + */ + private void checkTransaction() { + TransactionStatus tx = ses.attachment(); + + if (ses.isWithinTransaction()) { + assertNotNull(tx); + assertFalse(tx.isCompleted()); + } + else + assertNull(tx); + } + + /** + */ + private void checkConnection() { + Connection conn = DataSourceUtils.getConnection(jdbc.getDataSource()); + + assertNotNull(conn); + + try { + assertFalse(conn.isClosed()); + assertEquals(!ses.isWithinTransaction(), conn.getAutoCommit()); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + + verifySameInstance(conn); + } + + /** + * @param conn Connection. + */ + private void verifySameInstance(Connection conn) { + Map<String, Connection> props = ses.properties(); + + Connection sesConn = props.get(SES_CONN_KEY); + + if (sesConn == null) + props.put(SES_CONN_KEY, conn); + else { + assertSame(conn, sesConn); + + reuseCnt.incrementAndGet(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java deleted file mode 100644 index 83ed249..0000000 --- a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.spring; - -import org.apache.ignite.cache.store.*; -import org.apache.ignite.cache.store.jdbc.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.springframework.jdbc.core.*; -import org.springframework.jdbc.datasource.*; -import org.springframework.transaction.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import javax.cache.integration.*; -import javax.sql.*; -import java.sql.*; -import java.util.*; - -/** - * Tests for {@link CacheStoreSessionJdbcListener}. - */ -public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest { - /** */ - private static final DataSource DATA_SRC = new DriverManagerDataSource(URL); - - /** {@inheritDoc} */ - @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() { - return new Factory<CacheStore<Integer, Integer>>() { - @Override public CacheStore<Integer, Integer> create() { - return new Store(new JdbcTemplate(DATA_SRC)); - } - }; - } - - /** {@inheritDoc} */ - @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() { - return new Factory<CacheStoreSessionListener>() { - @Override public CacheStoreSessionListener create() { - CacheStoreSessionSpringListener lsnr = new CacheStoreSessionSpringListener(); - - lsnr.setDataSource(DATA_SRC); - - return lsnr; - } - }; - } - - /** - */ - private static class Store extends CacheStoreAdapter<Integer, Integer> { - /** */ - private static String SES_CONN_KEY = "ses_conn"; - - /** */ - private final JdbcTemplate jdbc; - - /** */ - @CacheStoreSessionResource - private CacheStoreSession ses; - - /** - * @param jdbc JDBC template. - */ - private Store(JdbcTemplate jdbc) { - this.jdbc = jdbc; - } - - /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) { - loadCacheCnt.incrementAndGet(); - - checkTransaction(); - checkConnection(); - } - - /** {@inheritDoc} */ - @Override public Integer load(Integer key) throws CacheLoaderException { - loadCnt.incrementAndGet(); - - checkTransaction(); - checkConnection(); - - return null; - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) - throws CacheWriterException { - writeCnt.incrementAndGet(); - - checkTransaction(); - checkConnection(); - - if (write.get()) { - String table; - - switch (ses.cacheName()) { - case "cache1": - table = "Table1"; - - break; - - case "cache2": - if (fail.get()) - throw new CacheWriterException("Expected failure."); - - table = "Table2"; - - break; - - default: - throw new CacheWriterException("Wring cache: " + ses.cacheName()); - } - - jdbc.update("INSERT INTO " + table + " (key, value) VALUES (?, ?)", - entry.getKey(), entry.getValue()); - } - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) throws CacheWriterException { - deleteCnt.incrementAndGet(); - - checkTransaction(); - checkConnection(); - } - - /** {@inheritDoc} */ - @Override public void sessionEnd(boolean commit) { - assertNull(transaction()); - } - - /** - */ - private void checkTransaction() { - TransactionStatus tx = transaction(); - - if (ses.isWithinTransaction()) { - assertNotNull(tx); - assertFalse(tx.isCompleted()); - } - else - assertNull(tx); - } - - /** - * @return Transaction status. - */ - private TransactionStatus transaction() { - return ses.<String, TransactionStatus>properties().get(CacheStoreSessionSpringListener.TX_STATUS_KEY); - } - - /** - */ - private void checkConnection() { - Connection conn = DataSourceUtils.getConnection(jdbc.getDataSource()); - - assertNotNull(conn); - - try { - assertFalse(conn.isClosed()); - assertEquals(!ses.isWithinTransaction(), conn.getAutoCommit()); - } - catch (SQLException e) { - throw new RuntimeException(e); - } - - verifySameInstance(conn); - } - - /** - * @param conn Connection. - */ - private void verifySameInstance(Connection conn) { - Map<String, Connection> props = ses.properties(); - - Connection sesConn = props.get(SES_CONN_KEY); - - if (sesConn == null) - props.put(SES_CONN_KEY, conn); - else { - assertSame(conn, sesConn); - - reuseCnt.incrementAndGet(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java index 0b7e471..12dd494 100644 --- a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java +++ b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java @@ -48,7 +48,7 @@ public class IgniteSpringTestSuite extends TestSuite { suite.addTest(new TestSuite(IgniteStartFromStreamConfigurationTest.class)); - suite.addTestSuite(CacheStoreSessionSpringListenerSelfTest.class); + suite.addTestSuite(CacheSpringStoreSessionListenerSelfTest.class); return suite; }