# IGNITE-32: Initial commit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/410f9e5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/410f9e5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/410f9e5e Branch: refs/heads/ignite-32 Commit: 410f9e5e448fce21a59fc94031dd52d3a9bb8d4b Parents: ee7bb3f Author: AKuznetsov <akuznet...@gridgain.com> Authored: Wed Jan 21 17:40:11 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Wed Jan 21 17:40:11 2015 +0700 ---------------------------------------------------------------------- .../ignite/cache/store/jdbc/JdbcCacheStore.java | 1160 +++++++++++ .../cache/store/jdbc/JdbcPojoCacheStore.java | 188 ++ .../store/jdbc/dialect/BasicJdbcDialect.java | 267 +++ .../cache/store/jdbc/dialect/DB2Dialect.java | 68 + .../cache/store/jdbc/dialect/H2Dialect.java | 41 + .../cache/store/jdbc/dialect/MySQLDialect.java | 48 + .../cache/store/jdbc/dialect/OracleDialect.java | 72 + .../store/jdbc/dialect/SQLServerDialect.java | 67 + .../cache/store/jdbc/dialect/package.html | 24 + .../query/GridCacheQueryTypeDescriptor.java | 135 ++ .../cache/query/GridCacheQueryTypeMetadata.java | 147 +- ...AbstractCacheStoreMultithreadedSelfTest.java | 127 ++ .../PojoCacheStoreMultitreadedSelfTest.java | 106 + .../store/jdbc/PojoCacheStoreSelfTest.java | 551 +++++ .../cache/store/jdbc/model/Organization.java | 153 ++ .../cache/store/jdbc/model/OrganizationKey.java | 96 + .../ignite/cache/store/jdbc/model/Person.java | 153 ++ .../cache/store/jdbc/model/PersonKey.java | 96 + .../ignite/cache/store/jdbc/model/package.html | 24 + modules/schema-load/pom.xml | 91 + .../main/java/media/data_connection_48x48.png | Bin 0 -> 4443 bytes .../src/main/java/media/error_48x48.png | Bin 0 -> 4349 bytes .../src/main/java/media/ignite_128x128.png | Bin 0 -> 4917 bytes .../src/main/java/media/ignite_16x16.png | Bin 0 -> 608 bytes .../src/main/java/media/ignite_24x24.png | Bin 0 -> 930 bytes .../src/main/java/media/ignite_32x32.png | Bin 0 -> 1203 bytes .../src/main/java/media/ignite_48x48.png | Bin 0 -> 1868 bytes .../src/main/java/media/ignite_64x64.png | Bin 0 -> 2453 bytes .../src/main/java/media/information_48x48.png | Bin 0 -> 4102 bytes .../src/main/java/media/navigate_down_24x24.png | Bin 0 -> 861 bytes .../src/main/java/media/navigate_up_24x24.png | Bin 0 -> 852 bytes .../src/main/java/media/question_48x48.png | Bin 0 -> 3857 bytes .../src/main/java/media/sign_warning_48x48.png | Bin 0 -> 2988 bytes .../schema-load/src/main/java/media/style.css | 58 + .../src/main/java/media/text_tree_48x48.png | Bin 0 -> 2567 bytes .../ignite/schema/generator/PojoGenerator.java | 316 +++ .../ignite/schema/generator/XmlGenerator.java | 320 +++ .../ignite/schema/ui/ConfirmCallable.java | 81 + .../org/apache/ignite/schema/ui/Controls.java | 377 ++++ .../org/apache/ignite/schema/ui/GridPaneEx.java | 177 ++ .../org/apache/ignite/schema/ui/MessageBox.java | 238 +++ .../apache/ignite/schema/ui/ModalDialog.java | 50 + .../apache/ignite/schema/ui/SchemaLoadApp.java | 1898 ++++++++++++++++++ pom.xml | 1 + 44 files changed, 7119 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java new file mode 100644 index 0000000..3de4ae4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java @@ -0,0 +1,1160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.jdbc.dialect.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.query.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.integration.*; +import javax.sql.*; +import java.net.*; +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Base {@link CacheStore} implementation backed by JDBC. This implementation stores objects in underlying database + * using mapping description. + */ +public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { + /** + * Query cache by type. + */ + protected static class QueryCache { + /** Database dialect. */ + protected final BasicJdbcDialect dialect; + + /** Select all items query. */ + protected final String loadCacheQry; + + /** Select item query. */ + protected final String loadQrySingle; + + /** Select items query. */ + private final String loadQry; + + /** Merge item(s) query. */ + protected final String mergeQry; + + /** Update item query. */ + protected final String insQry; + + /** Update item query. */ + protected final String updQry; + + /** Remove item(s) query. */ + protected final String remQry; + + /** Max key count for load query per statement. */ + protected final int maxKeysPerStmt; + + /** Database key columns. */ + private final Collection<String> keyCols; + + /** Database unique value columns. */ + private final Collection<String> cols; + + /** Unique value fields. */ + private final Collection<GridCacheQueryTypeDescriptor> uniqValFields; + + /** Type metadata. */ + private final GridCacheQueryTypeMetadata typeMetadata; + + /** + * @param typeMetadata Type metadata. + */ + public QueryCache(BasicJdbcDialect dialect, GridCacheQueryTypeMetadata typeMetadata) { + this.dialect = dialect; + + this.typeMetadata = typeMetadata; + + final Collection<GridCacheQueryTypeDescriptor> keyFields = typeMetadata.getKeyDescriptors(); + + Collection<GridCacheQueryTypeDescriptor> valFields = typeMetadata.getValueDescriptors(); + + uniqValFields = F.view(typeMetadata.getValueDescriptors(), + new IgnitePredicate<GridCacheQueryTypeDescriptor>() { + @Override public boolean apply(GridCacheQueryTypeDescriptor desc) { + return !keyFields.contains(desc); + } + }); + + String schema = typeMetadata.getSchema(); + + String tblName = typeMetadata.getTableName(); + + keyCols = databaseColumns(keyFields); + + Collection<String> valCols = databaseColumns(valFields); + + Collection<String> uniqValCols = databaseColumns(uniqValFields); + + loadCacheQry = dialect.loadCacheQuery(schema, tblName, F.concat(false, keyCols, uniqValCols)); + + loadQrySingle = dialect.loadQuery(schema, tblName, keyCols, valCols, 1); + + maxKeysPerStmt = dialect.getMaxParamsCnt() / keyCols.size(); + + loadQry = dialect.loadQuery(schema, tblName, keyCols, uniqValCols, maxKeysPerStmt); + + insQry = dialect.insertQuery(schema, tblName, keyCols, uniqValCols); + + updQry = dialect.updateQuery(schema, tblName, keyCols, uniqValCols); + + mergeQry = dialect.mergeQuery(schema, tblName, keyCols, uniqValCols); + + remQry = dialect.removeQuery(schema, tblName, keyCols); + + cols = F.concat(false, keyCols, valCols); + } + + /** + * Construct query for select values with key count less or equal {@code maxKeysPerStmt} + * + * @param keyCnt Key count. + */ + protected String loadQuery(int keyCnt) { + assert keyCnt <= maxKeysPerStmt; + + if (keyCnt == maxKeysPerStmt) + return loadQry; + + if (keyCnt == 1) + return loadQrySingle; + + return dialect.loadQuery(typeMetadata.getSchema(), typeMetadata.getTableName(), keyCols, cols, keyCnt); + } + + /** Key type. */ + protected String keyType() { + return typeMetadata.getKeyType(); + } + + /** Value type. */ + protected String valueType() { + return typeMetadata.getType(); + } + + /** + * Gets key fields type descriptors. + * + * @return Key fields type descriptors. + */ + protected Collection<GridCacheQueryTypeDescriptor> keyDescriptors() { + return typeMetadata.getKeyDescriptors(); + } + + /** + * Gets value fields type descriptors. + * + * @return Key value type descriptors. + */ + protected Collection<GridCacheQueryTypeDescriptor> valueDescriptors() { + return typeMetadata.getValueDescriptors(); + } + } + + /** Default batch size for put and remove operations. */ + protected static final int DFLT_BATCH_SIZE = 512; + + /** Connection attribute name. */ + protected static final String ATTR_CONN = "JDBC_STORE_CONNECTION"; + + /** Auto-injected grid instance. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Auto-injected logger instance. */ + @IgniteLoggerResource + protected IgniteLogger log; + + /** Init guard. */ + @GridToStringExclude + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Init latch. */ + @GridToStringExclude + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** Successful initialization flag. */ + private boolean initOk; + + /** Data source. */ + protected DataSource dataSrc; + + /** Connection URL. */ + protected String connUrl; + + /** User name for database access. */ + protected String user; + + /** Password for database access. */ + @GridToStringExclude + protected String passwd; + + /** Execute. */ + protected ExecutorService exec; + + /** Paths to xml with type mapping description. */ + protected Collection<String> typeMetadataPaths; + + /** Type mapping description. */ + protected Collection<GridCacheQueryTypeMetadata> typeMetadata; + + /** Cache with query by type. */ + protected Map<Object, QueryCache> entryQtyCache; + + /** Database dialect. */ + protected BasicJdbcDialect dialect; + + /** Max workers thread count. These threads are responsible for execute query. */ + protected int maxPoolSz = Runtime.getRuntime().availableProcessors(); + + /** Maximum batch size for put and remove operations. */ + protected int batchSz = DFLT_BATCH_SIZE; + + /** + * Perform dialect resolution. + * + * @return The resolved dialect. + * @throws IgniteCheckedException Indicates problems accessing the metadata. + */ + protected BasicJdbcDialect resolveDialect() throws IgniteCheckedException { + Connection conn = null; + + String dbProductName = null; + + try { + conn = openConnection(false); + + dbProductName = conn.getMetaData().getDatabaseProductName(); + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed access to metadata for detect database dialect.", e); + } + finally { + closeConnection(conn); + } + + if ("H2".equals(dbProductName)) + return new H2Dialect(); + + if ("MySQL".equals(dbProductName)) + return new MySQLDialect(); + + if (dbProductName.startsWith("Microsoft SQL Server")) + return new SQLServerDialect(); + + if ("Oracle".equals(dbProductName)) + return new OracleDialect(); + + if (dbProductName.startsWith("DB2/")) + return new DB2Dialect(); + + return new BasicJdbcDialect(); + } + + /** + * Initializes store. + * + * @throws IgniteCheckedException If failed to initialize. + */ + protected void init() throws IgniteCheckedException { + if (initLatch.getCount() > 0) { + if (initGuard.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Initializing cache store."); + + if (dataSrc == null && F.isEmpty(connUrl)) + throw new IgniteCheckedException("Failed to initialize cache store (connection is not provided)."); + + if (dialect == null) + dialect = resolveDialect(); + + try { + if (typeMetadata == null) { + if (typeMetadataPaths == null) + throw new IgniteCheckedException( + "Failed to initialize cache store (metadata paths is not provided)."); + +// TODO: IGNITE-32 Replace with reading from config. +// GridSpringProcessor spring = SPRING.create(false); + + Collection<GridCacheQueryTypeMetadata> typeMeta = new ArrayList<>(); + + for (String path : typeMetadataPaths) { + URL url = U.resolveGridGainUrl(path); +// TODO: IGNITE-32 Replace with reading from config. +// if (url != null) { +// Map<String, Object> beans = spring.loadBeans(url, GridCacheQueryTypeMetadata.class). +// get(GridCacheQueryTypeMetadata.class); +// +// if (beans != null) +// for (Object bean : beans.values()) +// if (bean instanceof GridCacheQueryTypeMetadata) +// typeMeta.add((GridCacheQueryTypeMetadata)bean); +// } +// else + log.warning("Failed to resolve metadata path: " + path); + } + + setTypeMetadata(typeMeta); + } + + exec = Executors.newFixedThreadPool(maxPoolSz); + + buildTypeCache(); + + initOk = true; + } + finally { + initLatch.countDown(); + } + } + else + U.await(initLatch); + } + + if (!initOk) + throw new IgniteCheckedException("Cache store was not properly initialized."); + } + + /** + * Closes allocated resources depending on transaction status. + * + * @param tx Active transaction, if any. + * @param conn Allocated connection. + * @param st Created statement, + */ + protected void end(@Nullable IgniteTx tx, @Nullable Connection conn, @Nullable Statement st) { + U.closeQuiet(st); + + if (tx == null) + // Close connection right away if there is no transaction. + closeConnection(conn); + } + + /** + * Gets connection from a pool. + * + * @param autocommit {@code true} If connection should use autocommit mode. + * @return Pooled connection. + * @throws SQLException In case of error. + */ + protected Connection openConnection(boolean autocommit) throws SQLException { + Connection conn = dataSrc != null ? dataSrc.getConnection() : + DriverManager.getConnection(connUrl, user, passwd); + + conn.setAutoCommit(autocommit); + + return conn; + } + + /** + * Closes connection. + * + * @param conn Connection to close. + */ + protected void closeConnection(@Nullable Connection conn) { + U.closeQuiet(conn); + } + + /** + * @param tx Cache transaction. + * @return Connection. + * @throws SQLException In case of error. + */ + protected Connection connection(@Nullable IgniteTx tx) throws SQLException { + if (tx != null) { + Connection conn = null;// TODO: IGNITE-32 FIXME tx.meta(ATTR_CONN); + + if (conn == null) { + conn = openConnection(false); + + // Store connection in transaction metadata, so it can be accessed + // for other operations on the same transaction. + // TODO: IGNITE-32 FIXME tx.addMeta(ATTR_CONN, conn); + } + + return conn; + } + // Transaction can be null in case of simple load operation. + else + return openConnection(true); + } + + /** {@inheritDoc} */ + public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { + init(); + + Connection conn = null; // TODO: IGNITE-32 FIXME tx.removeMeta(ATTR_CONN); + + if (conn != null) { + try { + if (commit) + conn.commit(); + else + conn.rollback(); + } + catch (SQLException e) { + throw new IgniteCheckedException( + "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); + } + finally { + closeConnection(conn); + } + } + + if (log.isDebugEnabled()) + log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); + } + + /** + * Extract database column names from {@link GridCacheQueryTypeDescriptor}. + * + * @param dsc collection of {@link GridCacheQueryTypeDescriptor}. + */ + protected static Collection<String> databaseColumns(Collection<GridCacheQueryTypeDescriptor> dsc) { + return F.transform(dsc, new C1<GridCacheQueryTypeDescriptor, String>() { + /** {@inheritDoc} */ + @Override public String apply(GridCacheQueryTypeDescriptor desc) { + return desc.getDbName(); + } + }); + } + + /** + * Get field value from object. + * + * @param typeName Type name. + * @param fieldName Field name. + * @param obj Cache object. + * @return Field value from object. + */ + @Nullable protected abstract Object extractField(String typeName, String fieldName, Object obj) + throws IgniteCheckedException; + + /** + * Construct object from query result. + * + * @param <R> Type of result object. + * @param typeName Type name. + * @param fields Fields descriptors. + * @param rs ResultSet. + * @return Constructed object. + */ + protected abstract <R> R buildObject(String typeName, Collection<GridCacheQueryTypeDescriptor> fields, ResultSet rs) + throws IgniteCheckedException; + + /** + * Extract type key from object. + * + * @param key Key object. + * @return Type key. + */ + protected abstract Object typeKey(K key); + + /** + * Build cache for mapped types. + * + * @throws IgniteCheckedException If failed to initialize. + */ + protected abstract void buildTypeCache() throws IgniteCheckedException; + + /** {@inheritDoc} */ + @Override public void loadCache(final IgniteBiInClosure<K, V> clo, @Nullable Object... args) + throws CacheLoaderException { + try { + init(); + + if (log.isDebugEnabled()) + log.debug("Loading all values from db"); + + Collection<Future<?>> futs = new ArrayList<>(); + + for (final QueryCache type : entryQtyCache.values()) + futs.add(exec.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + Connection conn = null; + + try { + PreparedStatement stmt = null; + + try { + conn = connection(null); + + stmt = conn.prepareStatement(type.loadCacheQry); + + ResultSet rs = stmt.executeQuery(); + + while (rs.next()) { + K key = buildObject(type.keyType(), type.keyDescriptors(), rs); + V val = buildObject(type.valueType(), type.valueDescriptors(), rs); + + clo.apply(key, val); + } + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to load cache", e); + } + finally { + U.closeQuiet(stmt); + } + } + finally { + closeConnection(conn); + } + + return null; + } + })); + + for (Future<?> fut : futs) + U.get(fut); + } + catch (Exception e) { + throw new CacheException(e); + } + } + + /** + * @param stmt Prepare statement. + * @param i Start index for parameters. + * @param type Type description. + * @param key Key object. + * @return Next index for parameters. + */ + protected int fillKeyParameters(PreparedStatement stmt, int i, QueryCache type, + K key) throws IgniteCheckedException { + for (GridCacheQueryTypeDescriptor field : type.keyDescriptors()) { + Object fieldVal = extractField(type.keyType(), field.getJavaName(), key); + + try { + if (fieldVal != null) + stmt.setObject(i++, fieldVal); + else + stmt.setNull(i++, field.getDbType()); + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to set statement parameter name: " + field.getDbName(), e); + } + } + + return i; + } + + /** + * @param stmt Prepare statement. + * @param type Type description. + * @param key Key object. + * @return Next index for parameters. + */ + protected int fillKeyParameters(PreparedStatement stmt, QueryCache type, K key) throws IgniteCheckedException { + return fillKeyParameters(stmt, 1, type, key); + } + + /** + * @param stmt Prepare statement. + * @param i Start index for parameters. + * @param type Type description. + * @param val Value object. + * @return Next index for parameters. + */ + protected int fillValueParameters(PreparedStatement stmt, int i, QueryCache type, V val) + throws IgniteCheckedException { + for (GridCacheQueryTypeDescriptor field : type.uniqValFields) { + Object fieldVal = extractField(type.valueType(), field.getJavaName(), val); + + try { + if (fieldVal != null) + stmt.setObject(i++, fieldVal); + else + stmt.setNull(i++, field.getDbType()); + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to set statement parameter name: " + field.getDbName(), e); + } + } + + return i; + } + + /** {@inheritDoc} */ + @Nullable public V load(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + init(); + + QueryCache type = entryQtyCache.get(typeKey(key)); + + if (type == null) + throw new IgniteCheckedException("Failed to find mapping description for type: " + key.getClass()); + + if (log.isDebugEnabled()) + log.debug("Start load value from db by key: " + key); + + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = connection(tx); + + stmt = conn.prepareStatement(type.loadQrySingle); + + fillKeyParameters(stmt, type, key); + + ResultSet rs = stmt.executeQuery(); + + if (rs.next()) + return buildObject(type.valueType(), type.valueDescriptors(), rs); + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to load object by key: " + key, e); + } + finally { + end(tx, conn, stmt); + } + + return null; + } + + /** + * Loads all values for given keys with same type and passes every value to the provided closure. + * + * @param tx Cache transaction, if write-behind is not enabled, null otherwise. + * @param qry Query cache for type. + * @param keys Collection of keys to load. + * @param c Closure to call for every loaded element. + * @throws IgniteCheckedException If load failed. + */ + protected void loadAll(@Nullable IgniteTx tx, QueryCache qry, Collection<? extends K> keys, + IgniteBiInClosure<K, V> c) throws IgniteCheckedException { + init(); + + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = connection(tx); + + stmt = conn.prepareStatement(qry.loadQuery(keys.size())); + + int i = 1; + + for (K key : keys) { + for (GridCacheQueryTypeDescriptor field : qry.keyDescriptors()) { + Object fieldVal = extractField(qry.keyType(), field.getJavaName(), key); + + if (fieldVal != null) + stmt.setObject(i++, fieldVal); + else + stmt.setNull(i++, field.getDbType()); + } + } + + ResultSet rs = stmt.executeQuery(); + + while (rs.next()) { + K key = buildObject(qry.keyType(), qry.keyDescriptors(), rs); + V val = buildObject(qry.valueType(), qry.valueDescriptors(), rs); + + c.apply(key, val); + } + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to load objects", e); + } + finally { + end(tx, conn, stmt); + } + } + + /** {@inheritDoc} */ + public void loadAll(@Nullable final IgniteTx tx, Collection<? extends K> keys, + final IgniteBiInClosure<K, V> c) throws IgniteCheckedException { + assert keys != null; + + init(); + + Map<QueryCache, Collection<K>> splittedKeys = U.newHashMap(entryQtyCache.size()); + + final Collection<Future<?>> futs = new ArrayList<>(); + + for (K key : keys) { + final QueryCache qry = entryQtyCache.get(typeKey(key)); + + Collection<K> batch = splittedKeys.get(qry); + + if (batch == null) + splittedKeys.put(qry, batch = new ArrayList<>()); + + batch.add(key); + + if (batch.size() == qry.maxKeysPerStmt) { + final Collection<K> p = splittedKeys.remove(qry); + + futs.add(exec.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + loadAll(tx, qry, p, c); + + return null; + } + })); + } + } + + for (final Map.Entry<QueryCache, Collection<K>> entry : splittedKeys.entrySet()) + futs.add(exec.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + loadAll(tx, entry.getKey(), entry.getValue(), c); + + return null; + } + })); + + for (Future<?> fut : futs) + U.get(fut); + } + + /** {@inheritDoc} */ + public void put(@Nullable IgniteTx tx, K key, V val) throws IgniteCheckedException { + init(); + + QueryCache type = entryQtyCache.get(typeKey(key)); + + if (type == null) + throw new IgniteCheckedException("Failed to find metadata for type: " + key.getClass()); + + if (log.isDebugEnabled()) + log.debug("Start put value in db: (" + key + ", " + val); + + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = connection(tx); + + if (dialect.hasMerge()) { + stmt = conn.prepareStatement(type.mergeQry); + + int i = fillKeyParameters(stmt, type, key); + + fillValueParameters(stmt, i, type, val); + + stmt.executeUpdate(); + } + else { + stmt = conn.prepareStatement(type.updQry); + + int i = fillValueParameters(stmt, 1, type, val); + + fillKeyParameters(stmt, i, type, key); + + if (stmt.executeUpdate() == 0) { + stmt.close(); + + stmt = conn.prepareStatement(type.insQry); + + i = fillKeyParameters(stmt, type, key); + + fillValueParameters(stmt, i, type, val); + + stmt.executeUpdate(); + } + } + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to put object by key: " + key, e); + } + finally { + end(tx, conn, stmt); + } + } + + /** + * Stores given key value pairs in persistent storage. + * + * @param tx Cache transaction, if write-behind is not enabled, null otherwise. + * @param qry Query cache for type. + * @param map Values to store. + * @throws IgniteCheckedException If store failed. + */ + /** {@inheritDoc} */ + protected void putAll(@Nullable IgniteTx tx, QueryCache qry, Iterable<Map.Entry<? extends K, ? extends V>> map) + throws IgniteCheckedException { + assert map != null; + + init(); + + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = connection(tx); + + stmt = conn.prepareStatement(qry.mergeQry); + + int cnt = 0; + + for (Map.Entry<? extends K, ? extends V> entry : map) { + int i = fillKeyParameters(stmt, qry, entry.getKey()); + + fillValueParameters(stmt, i, qry, entry.getValue()); + + stmt.addBatch(); + + if (cnt++ % batchSz == 0) + stmt.executeBatch(); + } + + if (cnt % batchSz != 0) + stmt.executeBatch(); + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to put objects", e); + } + finally { + end(tx, conn, stmt); + } + } + + /** {@inheritDoc} */ + public void putAll(@Nullable final IgniteTx tx, Map<? extends K, ? extends V> map) + throws IgniteCheckedException { + assert map != null; + + init(); + + Map<Object, Collection<Map.Entry<? extends K, ? extends V>>> keyByType = U.newHashMap(entryQtyCache.size()); + + if (dialect.hasMerge()) { + for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) { + Object typeKey = typeKey(entry.getKey()); + + Collection<Map.Entry<? extends K, ? extends V>> batch = keyByType.get(typeKey); + + if (batch == null) + keyByType.put(typeKey, batch = new ArrayList<>()); + + batch.add(entry); + } + + final Collection<Future<?>> futs = new ArrayList<>(); + + for (final Map.Entry<Object, Collection<Map.Entry<? extends K, ? extends V>>> e : keyByType.entrySet()) { + final QueryCache qry = entryQtyCache.get(e.getKey()); + + futs.add(exec.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + putAll(tx, qry, e.getValue()); + + return null; + } + })); + } + + for (Future<?> fut : futs) + U.get(fut); + } + else + for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) + put(tx, e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + public void remove(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + init(); + + QueryCache type = entryQtyCache.get(typeKey(key)); + + if (type == null) + throw new IgniteCheckedException("Failed to find metadata for type: " + key.getClass()); + + if (log.isDebugEnabled()) + log.debug("Start remove value from db by key: " + key); + + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = connection(tx); + + stmt = conn.prepareStatement(type.remQry); + + fillKeyParameters(stmt, type, key); + + stmt.executeUpdate(); + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to load object by key: " + key, e); + } + finally { + end(tx, conn, stmt); + } + } + + /** + * Removes all vales identified by given keys from persistent storage. + * + * @param tx Cache transaction, if write-behind is not enabled, null otherwise. + * @param qry Query cache for type. + * @param keys Collection of keys to remove. + * @throws IgniteCheckedException If remove failed. + */ + protected void removeAll(@Nullable IgniteTx tx, QueryCache qry, Collection<? extends K> keys) + throws IgniteCheckedException { + assert keys != null && !keys.isEmpty(); + + init(); + + if (log.isDebugEnabled()) + log.debug("Start remove values by keys: " + Arrays.toString(keys.toArray())); + + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = connection(tx); + + stmt = conn.prepareStatement(qry.remQry); + + int cnt = 0; + + for (K key : keys) { + fillKeyParameters(stmt, qry, key); + + stmt.addBatch(); + + if (cnt++ % batchSz == 0) + stmt.executeBatch(); + } + + if (cnt % batchSz != 0) + stmt.executeBatch(); + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to remove values by keys.", e); + } + finally { + end(tx, conn, stmt); + } + } + + /** {@inheritDoc} */ + public void removeAll(@Nullable IgniteTx tx, Collection<? extends K> keys) throws IgniteCheckedException { + assert keys != null; + + Map<Object, Collection<K>> keyByType = U.newHashMap(entryQtyCache.size()); + + for (K key : keys) { + Object typeKey = typeKey(key); + + Collection<K> batch = keyByType.get(typeKey); + + if (batch == null) + keyByType.put(typeKey, batch = new ArrayList<>()); + + batch.add(key); + } + + for (Map.Entry<Object, Collection<K>> entry : keyByType.entrySet()) { + QueryCache qry = entryQtyCache.get(entry.getKey()); + + removeAll(tx, qry, entry.getValue()); + } + } + + /** + * @return Data source. + */ + public DataSource getDataSource() { + return dataSrc; + } + + /** + * @param dataSrc Data source. + */ + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * @return Connection URL. + */ + public String getConnUrl() { + return connUrl; + } + + /** + * @param connUrl Connection URL. + */ + public void setConnUrl(String connUrl) { + this.connUrl = connUrl; + } + + /** + * @return Password for database access. + */ + public String getPassword() { + return passwd; + } + + /** + * @param passwd Password for database access. + */ + public void setPassword(String passwd) { + this.passwd = passwd; + } + + /** + * @return User name for database access. + */ + public String getUser() { + return user; + } + + /** + * @param user User name for database access. + */ + public void setUser(String user) { + this.user = user; + } + + /** + * @return Paths to xml with type mapping description. + */ + public Collection<String> getTypeMetadataPaths() { + return typeMetadataPaths; + } + + /** + * Set paths to xml with type mapping description. + * + * @param typeMetadataPaths Paths to xml. + */ + public void setTypeMetadataPaths(Collection<String> typeMetadataPaths) { + this.typeMetadataPaths = typeMetadataPaths; + } + + /** + * Set type mapping description. + * + * @param typeMetadata Type mapping description. + */ + public void setTypeMetadata(Collection<GridCacheQueryTypeMetadata> typeMetadata) { + this.typeMetadata = typeMetadata; + } + + /** + * Get database dialect. + * + * @return Database dialect. + */ + public BasicJdbcDialect getDialect() { + return dialect; + } + + /** + * Set database dialect. + * + * @param dialect Database dialect. + */ + public void setDialect(BasicJdbcDialect dialect) { + this.dialect = dialect; + } + + /** + * Get Max workers thread count. These threads are responsible for execute query. + * + * @return Max workers thread count. + */ + public int getMaxPoolSize() { + return maxPoolSz; + } + + /** + * Set Max workers thread count. These threads are responsible for execute query. + * + * @param maxPoolSz Max workers thread count. + */ + public void setMaxPoolSize(int maxPoolSz) { + this.maxPoolSz = maxPoolSz; + } + + /** + * Get maximum batch size for put and remove operations. + * + * @return Maximum batch size. + */ + public int getBatchSize() { + return batchSz; + } + + /** + * Set maximum batch size for put and remove operations. + * + * @param batchSz Maximum batch size. + */ + public void setBatchSize(int batchSz) { + this.batchSz = batchSz; + } + + @Override public void txEnd(boolean commit) throws CacheWriterException { + // TODO: SPRINT-32 CODE: implement. + } + + @Override public V load(K k) throws CacheLoaderException { + return null; // TODO: SPRINT-32 CODE: implement. + } + + @Override public Map<K, V> loadAll(Iterable<? extends K> iterable) throws CacheLoaderException { + return null; // TODO: SPRINT-32 CODE: implement. + } + + @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException { + // TODO: SPRINT-32 CODE: implement. + } + + @Override + public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> collection) throws CacheWriterException { + // TODO: SPRINT-32 CODE: implement. + } + + @Override public void delete(Object o) throws CacheWriterException { + // TODO: SPRINT-32 CODE: implement. + } + + @Override public void deleteAll(Collection<?> collection) throws CacheWriterException { + // TODO: SPRINT-32 CODE: implement. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java new file mode 100644 index 0000000..4d9953a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.gridgain.grid.cache.query.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.lang.reflect.*; +import java.sql.*; +import java.util.*; + +/** + * Base class for {@link CacheStore} that implementation backed by JDBC and POJO via reflection. + * + * This implementation stores objects in underlying database using java beans mapping description via reflection. + */ +public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> { + /** + * POJO methods cache. + */ + protected static class PojoMethodsCache { + /** POJO class. */ + protected final Class<?> cls; + + /** Constructor for POJO object. */ + private final Constructor ctor; + + /** Cached setters for POJO object. */ + private final Map<String, Method> getters; + + /** Cached getters for POJO object. */ + private final Map<String, Method> setters; + + /** + * POJO methods cache. + * + * @param clsName Class name. + * @param fields Fields. + */ + public PojoMethodsCache(String clsName, + Collection<GridCacheQueryTypeDescriptor> fields) throws IgniteCheckedException { + + try { + cls = Class.forName(clsName); + + ctor = cls.getDeclaredConstructor(); + + if (!ctor.isAccessible()) + ctor.setAccessible(true); + } + catch (ClassNotFoundException e) { + throw new IgniteCheckedException("Failed to find class: " + clsName, e); + } + catch (NoSuchMethodException e) { + throw new IgniteCheckedException("Failed to find empty constructor for class: " + clsName, e); + } + + setters = U.newHashMap(fields.size()); + + getters = U.newHashMap(fields.size()); + + for (GridCacheQueryTypeDescriptor field : fields) { + String prop = capitalFirst(field.getJavaName()); + + try { + getters.put(field.getJavaName(), cls.getMethod("get" + prop)); + } + catch (NoSuchMethodException ignored) { + try { + getters.put(field.getJavaName(), cls.getMethod("is" + prop)); + } + catch (NoSuchMethodException e) { + throw new IgniteCheckedException("Failed to find getter for property " + field.getJavaName() + + " of class: " + cls.getName(), e); + } + } + + try { + setters.put(field.getJavaName(), cls.getMethod("set" + prop, field.getJavaType())); + } + catch (NoSuchMethodException e) { + throw new IgniteCheckedException("Failed to find setter for property " + field.getJavaName() + + " of class: " + clsName, e); + } + } + } + + /** + * Capitalizes the first character of the given string. + * + * @param str String. + * @return String with capitalized first character. + */ + @Nullable private String capitalFirst(@Nullable String str) { + return str == null ? null : + str.isEmpty() ? "" : Character.toUpperCase(str.charAt(0)) + str.substring(1); + } + + /** + * Construct new instance of pojo object. + * + * @return pojo object. + * @throws IgniteCheckedException If construct new instance failed. + */ + protected Object newInstance() throws IgniteCheckedException { + try { + return ctor.newInstance(); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to create new instance for class: " + cls, e); + } + } + } + + /** Methods cache. */ + protected Map<String, PojoMethodsCache> mtdsCache; + + /** {@inheritDoc} */ + @Override protected void buildTypeCache() throws IgniteCheckedException { + entryQtyCache = U.newHashMap(typeMetadata.size()); + + mtdsCache = U.newHashMap(typeMetadata.size() * 2); + + for (GridCacheQueryTypeMetadata type : typeMetadata) { + PojoMethodsCache keyCache = new PojoMethodsCache(type.getKeyType(), type.getKeyDescriptors()); + + mtdsCache.put(type.getKeyType(), keyCache); + + entryQtyCache.put(keyCache.cls, new QueryCache(dialect, type)); + + mtdsCache.put(type.getType(), new PojoMethodsCache(type.getType(), type.getValueDescriptors())); + } + } + + /** {@inheritDoc} */ + @Override protected <R> R buildObject(String typeName, Collection<GridCacheQueryTypeDescriptor> fields, + ResultSet rs) throws IgniteCheckedException { + PojoMethodsCache t = mtdsCache.get(typeName); + + Object obj = t.newInstance(); + + try { + for (GridCacheQueryTypeDescriptor field : fields) + t.setters.get(field.getJavaName()).invoke(obj, rs.getObject(field.getDbName())); + + return (R)obj; + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to read object of class: " + typeName, e); + } + } + + /** {@inheritDoc} */ + @Nullable @Override protected Object extractField(String typeName, String fieldName, Object obj) + throws IgniteCheckedException { + try { + PojoMethodsCache mc = mtdsCache.get(typeName); + + return mc.getters.get(fieldName).invoke(obj); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to read object of class: " + typeName, e); + } + } + + /** {@inheritDoc} */ + @Override protected Object typeKey(Object key) { + return key.getClass(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java new file mode 100644 index 0000000..e6b4367 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc.dialect; + +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.util.*; + +/** + * Represents a dialect of SQL implemented by a particular RDBMS. + */ +public class BasicJdbcDialect { + /** Default max query parameters count. */ + protected static final int DFLT_MAX_PARAMS_CNT = 2000; + + /** Max query parameters count. */ + protected int maxParamsCnt = DFLT_MAX_PARAMS_CNT; + + /** + * Concatenates elements using provided separator. + * + * @param elems Concatenated elements. + * @param f closure used for transform element. + * @param start Start string. + * @param sep Separator. + * @param end End string. + * @return Concatenated string. + */ + protected static <T> String mkString(Iterable<T> elems, C1<T, String> f, String start, String sep, String end) { + SB sb = new SB(start); + + boolean first = true; + + for (T elem : elems) { + if (!first) + sb.a(sep); + + sb.a(f.apply(elem)); + + first = false; + } + + return sb.a(end).toString(); + } + + /** + * Concatenates elements using provided separator. + * + * @param strs Concatenated string. + * @param start Start string. + * @param sep Delimiter. + * @param end End string. + * @return Concatenated string. + */ + protected static String mkString(Iterable<String> strs, String start, String sep, String end) { + return mkString(strs, new C1<String, String>() { + @Override public String apply(String s) { + return s; + } + }, start, sep, end); + } + + /** + * Concatenates strings using provided separator. + * + * @param strs Concatenated string. + * @param sep Separator. + * @return Concatenated string. + */ + protected static String mkString(Iterable<String> strs, String sep) { + return mkString(strs, new C1<String, String>() { + @Override public String apply(String s) { + return s; + } + }, "", sep, ""); + } + + /** + * Concatenates elements using provided delimiter. + * + * @param str Repeated string. + * @param cnt Repeat count. + * @param start Start string. + * @param sep Separator. + * @param end End string. + */ + protected static String repeat(String str, int cnt, String start, String sep, String end) { + SB sb = new SB(str.length() * cnt + sep.length() * (cnt - 1) + start.length() + end.length()); + + sb.a(start); + + for (int i = 0; i < cnt; i++) { + if (i > 0) + sb.a(sep); + + sb.a(str); + } + + return sb.a(end).toString(); + } + + /** + * Construct where part of query. + * + * @param keyCols Database key columns. + * @param keyCnt Key count. + */ + private static String where(Collection<String> keyCols, int keyCnt) { + SB sb = new SB(); + + if (keyCols.size() == 1) { + String keyCol = keyCols.iterator().next(); + + if (keyCnt == 1) + sb.a(keyCol + "=?"); + else + sb.a(repeat("?", keyCnt, keyCol + " IN (", ",", ")")); + } + else { + String keyParams = mkString(keyCols, new C1<String, String>() { + @Override public String apply(String s) { + return s + "=?"; + } + }, "(", " AND ", ")"); + + sb.a(repeat(keyParams, keyCnt, "", " OR ", "")); + } + + return sb.toString(); + } + + /** + * Construct load cache query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param uniqCols Database unique value columns. + * @return Load cache query. + */ + public String loadCacheQuery(String schema, String tblName, Iterable<String> uniqCols) { + return String.format("SELECT %s FROM %s.%s", mkString(uniqCols, ","), schema, tblName); + } + + /** + * Construct load query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @param cols Selected columns. + * @param keyCnt Key count. + * @return Load query. + */ + public String loadQuery(String schema, String tblName, Collection<String> keyCols, Iterable<String> cols, + int keyCnt) { + assert !keyCols.isEmpty(); + + String params = where(keyCols, keyCnt); + + return String.format("SELECT %s FROM %s.%s WHERE %s", mkString(cols, ","), schema, tblName, params); + } + + /** + * Construct insert query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @param valCols Database value columns. + */ + public String insertQuery(String schema, String tblName, Collection<String> keyCols, Collection<String> valCols) { + Collection<String> cols = F.concat(false, keyCols, valCols); + + return String.format("INSERT INTO %s.%s(%s) VALUES(%s)", schema, tblName, mkString(cols, ","), + repeat("?", cols.size(), "", ",", "")); + } + + /** + * Construct update query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @param valCols Database value columns. + */ + public String updateQuery(String schema, String tblName, Collection<String> keyCols, Iterable<String> valCols) { + String params = mkString(valCols, new C1<String, String>() { + @Override public String apply(String s) { + return s + "=?"; + } + }, "", ",", ""); + + return String.format("UPDATE %s.%s SET %s WHERE %s", schema, tblName, params, where(keyCols, 1)); + } + + /** + * @return {@code True} if database support merge operation. + */ + public boolean hasMerge() { + return false; + } + + /** + * Construct merge query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @param uniqCols Database unique value columns. + * @return Put query. + */ + public String mergeQuery(String schema, String tblName, Collection<String> keyCols, Collection<String> uniqCols) { + return ""; + } + + /** + * Construct remove query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @return Remove query. + */ + public String removeQuery(String schema, String tblName, Iterable<String> keyCols) { + String whereParams = mkString(keyCols, new C1<String, String>() { + @Override public String apply(String s) { + return s + "=?"; + } + }, "", " AND ", ""); + + return String.format("DELETE FROM %s.%s WHERE %s", schema, tblName, whereParams); + } + + /** + * Get max query parameters count. + * + * @return Max query parameters count. + */ + public int getMaxParamsCnt() { + return maxParamsCnt; + } + + /** + * Set max query parameters count. + * + * @param maxParamsCnt Max query parameters count. + */ + public void setMaxParamsCnt(int maxParamsCnt) { + this.maxParamsCnt = maxParamsCnt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java new file mode 100644 index 0000000..4b5e7e7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc.dialect; + +import org.gridgain.grid.util.typedef.*; + +import java.util.*; + +/** + * A dialect compatible with the DB2 database. + */ +public class DB2Dialect extends BasicJdbcDialect { + /** {@inheritDoc} */ + @Override public boolean hasMerge() { + return true; + } + + /** {@inheritDoc} */ + @Override public String mergeQuery(String schema, String tblName, Collection<String> keyCols, + Collection<String> uniqCols) { + + Collection<String> cols = F.concat(false, keyCols, uniqCols); + + String colsLst = mkString(cols, ", "); + + String match = mkString(keyCols, new C1<String, String>() { + @Override public String apply(String col) { + return String.format("t.%s=v.%s", col, col); + } + }, "", ", ", ""); + + String setCols = mkString(uniqCols, new C1<String, String>() { + @Override public String apply(String col) { + return String.format("t.%s = v.%s", col, col); + } + }, "", ", ", ""); + + String valuesCols = mkString(cols, new C1<String, String>() { + @Override public String apply(String col) { + return "v." + col; + } + }, "", ", ", ""); + + return String.format("MERGE INTO %s.%s t" + + " USING (VALUES(%s)) AS v (%s)" + + " ON %s" + + " WHEN MATCHED THEN" + + " UPDATE SET %s" + + " WHEN NOT MATCHED THEN" + + " INSERT (%s) VALUES (%s)", schema, tblName, repeat("?", cols.size(), "", ",", ""), colsLst, + match, setCols, colsLst, valuesCols); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java new file mode 100644 index 0000000..d6ed168 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc.dialect; + +import org.gridgain.grid.util.typedef.*; + +import java.util.*; + +/** + * A dialect compatible with the H2 database. + */ +public class H2Dialect extends BasicJdbcDialect { + /** {@inheritDoc} */ + @Override public boolean hasMerge() { + return true; + } + + /** {@inheritDoc} */ + @Override public String mergeQuery(String schema, String tblName, Collection<String> keyCols, + Collection<String> uniqCols) { + Collection<String> cols = F.concat(false, keyCols, uniqCols); + + return String.format("MERGE INTO %s (%s) KEY (%s) VALUES(%s)", tblName, mkString(cols, ","), + mkString(keyCols, ","), repeat("?", cols.size(), "", ", ", "")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java new file mode 100644 index 0000000..0072b31 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc.dialect; + +import org.gridgain.grid.util.typedef.*; + +import java.util.*; + +/** + * A dialect compatible with the MySQL database. + */ +public class MySQLDialect extends BasicJdbcDialect { + /** {@inheritDoc} */ + @Override public boolean hasMerge() { + return true; + } + + /** {@inheritDoc} */ + @Override public String mergeQuery(String schema, String tblName, Collection<String> keyCols, + Collection<String> uniqCols) { + + Collection<String> cols = F.concat(false, keyCols, uniqCols); + + String updPart = mkString(uniqCols, new C1<String, String>() { + @Override public String apply(String col) { + return String.format("%s = VALUES(%s)", col, col); + } + }, "", ", ", ""); + + return String.format("INSERT INTO %s.%s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", schema, tblName, + mkString(cols, ", "), repeat("?", cols.size(), "", ",", ""), updPart); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java new file mode 100644 index 0000000..6fbc409 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc.dialect; + +import org.gridgain.grid.util.typedef.*; + +import java.util.*; + +/** + * A dialect compatible with the Oracle database. + */ +public class OracleDialect extends BasicJdbcDialect { + /** {@inheritDoc} */ + @Override public boolean hasMerge() { + return true; + } + + /** {@inheritDoc} */ + @Override public String mergeQuery(String schema, String tblName, Collection<String> keyCols, + Collection<String> uniqCols) { + Collection<String> cols = F.concat(false, keyCols, uniqCols); + + String colsLst = mkString(cols, ", "); + + String selCols = mkString(uniqCols, new C1<String, String>() { + @Override public String apply(String col) { + return String.format("? AS %s", col); + } + }, "", ", ", ""); + + String match = mkString(keyCols, new C1<String, String>() { + @Override public String apply(String col) { + return String.format("t.%s=v.%s", col, col); + } + }, "", ", ", ""); + + String setCols = mkString(uniqCols, new C1<String, String>() { + @Override public String apply(String col) { + return String.format("t.%s = v.%s", col, col); + } + }, "", ", ", ""); + + String valuesCols = mkString(cols, new C1<String, String>() { + @Override public String apply(String col) { + return "v." + col; + } + }, "", ", ", ""); + + return String.format("MERGE INTO %s.%s t" + + " USING (SELECT %s FROM dual) AS v" + + " ON %s" + + " WHEN MATCHED THEN" + + " UPDATE SET %s" + + " WHEN NOT MATCHED THEN" + + " INSERT (%s) VALUES (%s)", schema, tblName, selCols, match, setCols, colsLst, valuesCols); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java new file mode 100644 index 0000000..f4ca4ec --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc.dialect; + +import org.gridgain.grid.util.typedef.*; + +import java.util.*; + +/** + * A dialect compatible with the MsSQL database. + */ +public class SQLServerDialect extends BasicJdbcDialect { + /** {@inheritDoc} */ + @Override public boolean hasMerge() { + return true; + } + + /** {@inheritDoc} */ + @Override public String mergeQuery(String schema, String tblName, Collection<String> keyCols, + Collection<String> uniqCols) { + Collection<String> cols = F.concat(false, keyCols, uniqCols); + + String colsLst = mkString(cols, ", "); + + String match = mkString(keyCols, new C1<String, String>() { + @Override public String apply(String col) { + return String.format("t.%s=v.%s", col, col); + } + }, "", ", ", ""); + + String setCols = mkString(uniqCols, new C1<String, String>() { + @Override public String apply(String col) { + return String.format("t.%s = v.%s", col, col); + } + }, "", ", ", ""); + + String valuesCols = mkString(cols, new C1<String, String>() { + @Override public String apply(String col) { + return "v." + col; + } + }, "", ", ", ""); + + return String.format("MERGE INTO %s.%s t" + + " USING (VALUES(%s)) AS v (%s)" + + " ON %s" + + " WHEN MATCHED THEN" + + " UPDATE SET %s" + + " WHEN NOT MATCHED THEN" + + " INSERT (%s) VALUES (%s)", schema, tblName, repeat("?", cols.size(), "", ",", ""), colsLst, + match, setCols, colsLst, valuesCols); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/package.html b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/package.html new file mode 100644 index 0000000..88e8a49 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> +<!-- Package description. --> +Contains SQL dialects for ifferent RDBMS. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQueryTypeDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQueryTypeDescriptor.java b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQueryTypeDescriptor.java new file mode 100644 index 0000000..697caef --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQueryTypeDescriptor.java @@ -0,0 +1,135 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.cache.query; + +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Type descriptor for field in java and database. + */ +public class GridCacheQueryTypeDescriptor { + /** Column name in database. */ + private String javaName; + + /** Corresponding java type. */ + private Class<?> javaType; + + /** Column name in database. */ + private String dbName; + + /** Column JDBC type in database. */ + private int dbType; + + /** + * Default constructor. + */ + public GridCacheQueryTypeDescriptor() { + // No-op. + } + + /** + * @param javaName Field name in java object. + * @param javaType Field java type. + * @param dbName Column name in database. + * @param dbType Column JDBC type in database. + */ + public GridCacheQueryTypeDescriptor(String javaName, Class<?> javaType, String dbName, int dbType) { + this.javaName = javaName; + this.javaType = javaType; + this.dbName = dbName; + this.dbType = dbType; + } + + /** + * @return Field name in java object. + */ + public String getJavaName() { + return javaName; + } + + /** + * @param javaName Field name in java object. + */ + public void setJavaName(String javaName) { + this.javaName = javaName; + } + + /** + * @return Field java type. + */ + public Class<?> getJavaType() { + return javaType; + } + + /** + * @param javaType Corresponding java type. + */ + public void setJavaType(Class<?> javaType) { + this.javaType = javaType; + } + + /** + * @return Column name in database. + */ + public String getDbName() { + return dbName; + } + + /** + * @param dbName Column name in database. + */ + public void setDbName(String dbName) { + this.dbName = dbName; + } + + /** + * @return Column JDBC type in database. + */ + public int getDbType() { + return dbType; + } + + /** + * @param dbType Column JDBC type in database. + */ + public void setDbType(int dbType) { + this.dbType = dbType; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof GridCacheQueryTypeDescriptor)) + return false; + + GridCacheQueryTypeDescriptor that = (GridCacheQueryTypeDescriptor)o; + + return javaName.equals(that.javaName) && dbName.equals(that.dbName) && + javaType == that.javaType && dbType == that.dbType; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = javaName.hashCode(); + + res = 31 * res + dbName.hashCode(); + res = 31 * res + javaType.hashCode(); + res = 31 * res + dbType; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheQueryTypeDescriptor.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQueryTypeMetadata.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQueryTypeMetadata.java b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQueryTypeMetadata.java index d4a3eb9..ca511d7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQueryTypeMetadata.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQueryTypeMetadata.java @@ -28,24 +28,40 @@ import java.util.*; */ public class GridCacheQueryTypeMetadata { /** Type name, e.g. class name. */ - @GridToStringInclude private String type; + /** Schema name in database. */ + private String schema; + + /** Table name in database. */ + private String tbl; + + /** Key class. */ + private String keyType; + + /** Type descriptors. */ + @GridToStringInclude + private Collection<GridCacheQueryTypeDescriptor> keyDescs; + + /** Type descriptors. */ + @GridToStringInclude + private Collection<GridCacheQueryTypeDescriptor> valDescs; + /** Fields to be queried, in addition to indexed fields. */ @GridToStringInclude - private Map<String, Class<?>> qryFlds = new HashMap<>(); + private Map<String, Class<?>> qryFlds; /** Fields to index in ascending order. */ @GridToStringInclude - private Map<String, Class<?>> ascFlds = new HashMap<>(); + private Map<String, Class<?>> ascFlds; /** Fields to index in descending order. */ @GridToStringInclude - private Map<String, Class<?>> descFlds = new HashMap<>(); + private Map<String, Class<?>> descFlds; /** Fields to index as text. */ @GridToStringInclude - private Collection<String> txtFlds = new LinkedHashSet<>(); + private Collection<String> txtFlds; /** Fields to create group indexes for. */ @GridToStringInclude @@ -55,7 +71,19 @@ public class GridCacheQueryTypeMetadata { * Default constructor. */ public GridCacheQueryTypeMetadata() { - // No-op. + keyDescs = new ArrayList<>(); + + valDescs = new ArrayList<>(); + + qryFlds = new LinkedHashMap<>(); + + ascFlds = new LinkedHashMap<>(); + + descFlds = new LinkedHashMap<>(); + + txtFlds = new LinkedHashSet<>(); + + grps = new LinkedHashMap<>(); } /** @@ -63,13 +91,20 @@ public class GridCacheQueryTypeMetadata { */ public GridCacheQueryTypeMetadata(GridCacheQueryTypeMetadata src) { type = src.getType(); + keyType = src.getKeyType(); + + schema = src.getSchema(); + tbl = src.getTableName(); - qryFlds = new HashMap<>(src.getQueryFields()); - ascFlds = new HashMap<>(src.getAscendingFields()); - descFlds = new HashMap<>(src.getDescendingFields()); - txtFlds = new HashSet<>(src.getTextFields()); + keyDescs = new ArrayList<>(src.getKeyDescriptors()); + valDescs = new ArrayList<>(src.getValueDescriptors()); - grps = new HashMap<>(src.getGroups()); + qryFlds = new LinkedHashMap<>(src.getQueryFields()); + ascFlds = new LinkedHashMap<>(src.getAscendingFields()); + descFlds = new LinkedHashMap<>(src.getDescendingFields()); + txtFlds = new LinkedHashSet<>(src.getTextFields()); + + grps = new LinkedHashMap<>(src.getGroups()); } /** @@ -100,6 +135,96 @@ public class GridCacheQueryTypeMetadata { } /** + * Gets database schema name. + * + * @return Schema name. + */ + public String getSchema() { + return schema; + } + + /** + * Sets database schema name. + * + * @param schema Schema name. + */ + public void setSchema(String schema) { + this.schema = schema; + } + + /** + * Gets table name in database. + * + * @return Table name in database. + */ + public String getTableName() { + return tbl; + } + + /** + * Table name in database. + * + * @param tbl Table name in database. + */ + public void setTableName(String tbl) { + this.tbl = tbl; + } + + /** + * Gets key type. + * + * @return Key type. + */ + public String getKeyType() { + return keyType; + } + + /** + * Sets key type. + * + * @param keyType Key type. + */ + public void setKeyType(String keyType) { + this.keyType = keyType; + } + + /** + * Gets key fields type descriptors. + * + * @return Key fields type descriptors. + */ + public Collection<GridCacheQueryTypeDescriptor> getKeyDescriptors() { + return keyDescs; + } + + /** + * Sets key fields type descriptors. + * + * @param keyDescs Key fields type descriptors. + */ + public void setKeyDescriptors(Collection<GridCacheQueryTypeDescriptor> keyDescs) { + this.keyDescs = keyDescs; + } + + /** + * Gets value fields type descriptors. + * + * @return Key value type descriptors. + */ + public Collection<GridCacheQueryTypeDescriptor> getValueDescriptors() { + return valDescs; + } + + /** + * Sets value fields type descriptors. + * + * @param valDescs Value fields type descriptors. + */ + public void setValueDescriptors(Collection<GridCacheQueryTypeDescriptor> valDescs) { + this.valDescs = valDescs; + } + + /** * Gets query-enabled fields. * * @return Collection of fields available for query. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/410f9e5e/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractCacheStoreMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractCacheStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractCacheStoreMultithreadedSelfTest.java new file mode 100644 index 0000000..483b2c3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractCacheStoreMultithreadedSelfTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.cache.store.jdbc.model.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.gridgain.testframework.GridTestUtils.*; + +/** + * + */ +public abstract class AbstractCacheStoreMultithreadedSelfTest<T extends JdbcCacheStore> extends GridCommonAbstractTest { + /** Default connection URL (value is <tt>jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1</tt>). */ + protected static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1"; + + /** IP finder. */ + protected static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of transactions. */ + private static final int TX_CNT = 1000; + + /** Number of transactions. */ + private static final int BATCH_CNT = 2000; + + /** Cache store. */ + protected T store; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + store = store(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Class.forName("org.h2.Driver"); + Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", ""); + + Statement stmt = conn.createStatement(); + + stmt.executeUpdate("DROP TABLE IF EXISTS Organization"); + stmt.executeUpdate("DROP TABLE IF EXISTS Person"); + + stmt.executeUpdate("CREATE TABLE Organization (id integer PRIMARY KEY, name varchar(50), city varchar(50))"); + stmt.executeUpdate("CREATE TABLE Person (id integer PRIMARY KEY, org_id integer, name varchar(50))"); + + stmt.executeUpdate("CREATE INDEX Org_Name_IDX On Organization (name)"); + stmt.executeUpdate("CREATE INDEX Org_Name_City_IDX On Organization (name, city)"); + stmt.executeUpdate("CREATE INDEX Person_Name_IDX1 On Person (name)"); + stmt.executeUpdate("CREATE INDEX Person_Name_IDX2 On Person (name desc)"); + + conn.commit(); + + U.closeQuiet(stmt); + + U.closeQuiet(conn); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @return New store. + * @throws Exception In case of error. + */ + protected abstract T store() throws Exception; + + /** + * @throws Exception If failed. + */ + public void testMultithreadedPutAll() throws Exception { + startGrid(); + + runMultiThreaded(new Callable<Object>() { + private final Random rnd = new Random(); + + @Nullable @Override public Object call() throws Exception { + for (int i = 0; i < TX_CNT; i++) { + int cnt = rnd.nextInt(BATCH_CNT); + + Map<Object, Object> map = U.newHashMap(cnt); + + for (int j = 0; j < cnt; j++) { + int id = rnd.nextInt(); + + if (rnd.nextBoolean()) + map.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id)); + else + map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id)); + } + + GridCache<Object, Object> cache = cache(); + + cache.putAll(map); + } + + return null; + } + }, 8, "putAll"); + } +}