Merge branch 'sprint-1' into ignite-188 Conflicts: modules/core/src/main/java/org/apache/ignite/IgniteCache.java modules/core/src/main/java/org/apache/ignite/IgniteQueue.java modules/core/src/main/java/org/apache/ignite/IgniteSet.java modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionContextSelfTest.java modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/33834a34 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/33834a34 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/33834a34 Branch: refs/heads/ignite-188 Commit: 33834a3449031786d2b1cab919a09beb15bd8df7 Parents: 0704570 e4365c3 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Fri Feb 6 15:58:20 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Fri Feb 6 15:58:20 2015 +0300 ---------------------------------------------------------------------- bin/include/ggservice.exe | Bin 5632 -> 0 bytes bin/include/igniteservice.exe | Bin 0 -> 6144 bytes .../IgniteAtomicReferenceExample.java | 2 +- .../IgniteAtomicStampedExample.java | 2 +- .../MessagingPingPongListenActorExample.java | 4 +- .../streaming/StreamingCheckInExample.java | 2 +- .../StreamingPopularNumbersExample.java | 2 +- .../streaming/StreamingPriceBarsExample.java | 2 +- .../apache/ignite/examples/ComputeExample.java | 6 +- .../examples/ScalarCacheAffinityExample1.scala | 3 +- .../java/org/apache/ignite/IgniteCluster.java | 6 +- .../java/org/apache/ignite/IgniteCompute.java | 18 +- .../java/org/apache/ignite/IgniteQueue.java | 4 +- .../main/java/org/apache/ignite/IgniteSet.java | 2 +- .../ignite/cache/eviction/EvictableEntry.java | 2 +- .../store/jdbc/CacheAbstractJdbcStore.java | 1567 ++++++++++++++++++ .../cache/store/jdbc/CacheJdbcPojoStore.java | 205 +++ .../ignite/cache/store/jdbc/JdbcCacheStore.java | 1560 ----------------- .../cache/store/jdbc/JdbcPojoCacheStore.java | 205 --- .../ignite/internal/IgniteComputeImpl.java | 21 +- .../apache/ignite/internal/IgniteKernal.java | 16 +- .../processors/cache/IgniteCacheProxy.java | 6 - .../GridDistributedCacheAdapter.java | 46 +- .../ignite/internal/util/IgniteUtils.java | 2 +- .../internal/util/lang/GridAbsClosure.java | 4 +- .../util/nodestart/GridNodeCallable.java | 29 - .../util/nodestart/GridNodeStartUtils.java | 390 ----- .../nodestart/GridRemoteStartSpecification.java | 279 ---- .../util/nodestart/IgniteNodeCallable.java | 30 + .../util/nodestart/IgniteNodeStartUtils.java | 391 +++++ .../IgniteRemoteStartSpecification.java | 279 ++++ .../util/nodestart/IgniteSshProcessor.java | 2 +- .../apache/ignite/internal/util/typedef/CA.java | 3 +- .../ignite/internal/util/typedef/CAX.java | 2 +- .../org/apache/ignite/lang/IgniteRunnable.java | 2 +- .../optimized/OptimizedObjectInputStream.java | 2 + .../ignite/messaging/MessagingListenActor.java | 18 +- .../core/src/test/bin/start-nodes-custom.bat | 2 +- modules/core/src/test/config/start-nodes.ini | 6 +- .../core/src/test/config/store/jdbc/Ignite.xml | 50 + ...ractJdbcCacheStoreMultithreadedSelfTest.java | 196 --- .../CacheJdbcPojoStoreMultitreadedSelfTest.java | 35 + .../store/jdbc/CacheJdbcPojoStoreTest.java | 759 +++++++++ ...eJdbcStoreAbstractMultithreadedSelfTest.java | 275 +++ .../PojoJdbcCacheStoreMultitreadedSelfTest.java | 34 - .../store/jdbc/PojoJdbcCacheStoreTest.java | 702 -------- .../store/jdbc/model/PersonComplexKey.java | 146 ++ .../GridJobMasterLeaveAwareSelfTest.java | 4 +- .../internal/GridListenActorSelfTest.java | 11 +- .../internal/GridManagementJobSelfTest.java | 7 +- .../internal/GridProjectionAbstractTest.java | 18 +- .../GridTaskExecutionContextSelfTest.java | 6 +- .../IgniteComputeEmptyClusterGroupTest.java | 4 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 8 +- ...eAbstractDataStructuresFailoverSelfTest.java | 2 +- ...GridCacheQueueMultiNodeAbstractSelfTest.java | 2 +- .../GridCacheSetAbstractSelfTest.java | 3 +- ...xOriginatingNodeFailureAbstractSelfTest.java | 3 +- ...cOriginatingNodeFailureAbstractSelfTest.java | 4 +- .../near/GridCacheNearEvictionSelfTest.java | 5 +- .../GridCachePartitionedAffinitySelfTest.java | 2 +- .../GridCachePartitionedEntryLockSelfTest.java | 3 +- ...achePartitionedMultiNodeFullApiSelfTest.java | 2 +- .../closure/GridClosureProcessorSelfTest.java | 14 +- .../loadtests/colocation/GridTestMain.java | 2 +- .../marshaller/GridMarshallerAbstractTest.java | 2 +- .../spi/GridTcpSpiForwardingSelfTest.java | 3 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + .../GridHibernateAccessStrategyAdapter.java | 3 +- .../ignite/scalar/ScalarConversions.scala | 8 +- .../ignite/scalar/lang/ScalarOutClosure.scala | 4 +- .../scalar/lang/ScalarOutClosureFunction.scala | 4 +- .../ignite/schema/model/PojoDescriptor.java | 8 +- .../org/apache/ignite/schema/ui/Controls.java | 4 +- .../apache/ignite/schema/ui/SchemaLoadApp.java | 137 +- .../schema/load/AbstractSchemaLoaderTest.java | 4 +- .../util/nodestart/GridNodeCallableImpl.java | 344 ---- .../util/nodestart/IgniteNodeCallableImpl.java | 344 ++++ .../util/nodestart/IgniteSshProcessorImpl.java | 4 +- .../internal/GridNodeStartUtilsSelfTest.java | 89 - .../GridProjectionStartStopRestartSelfTest.java | 1032 ------------ .../internal/IgniteNodeStartUtilsSelfTest.java | 88 + ...gniteProjectionStartStopRestartSelfTest.java | 1032 ++++++++++++ .../IgniteStartStopRestartTestSuite.java | 4 +- .../commands/start/VisorStartCommand.scala | 4 +- .../commands/open/VisorOpenCommandSpec.scala | 6 +- modules/winservice/IgniteService.sln | 22 + .../winservice/IgniteService/IgniteService.cs | 170 ++ .../IgniteService/IgniteService.csproj | 90 + modules/winservice/README.md | 3 + .../yardstick/config/benchmark-store.properties | 35 +- .../yardstick/config/ignite-store-config.xml | 11 +- .../jdbc/IgniteJdbcStoreAbstractBenchmark.java | 95 +- .../store/jdbc/IgniteJdbcStoreGetBenchmark.java | 10 +- .../jdbc/IgniteJdbcStoreGetTxBenchmark.java | 47 + .../store/jdbc/IgniteJdbcStorePutBenchmark.java | 5 + .../jdbc/IgniteJdbcStorePutGetBenchmark.java | 5 + .../jdbc/IgniteJdbcStorePutGetTxBenchmark.java | 52 + .../jdbc/IgniteJdbcStorePutTxBenchmark.java | 47 + .../yardstick/compute/IgniteRunBenchmark.java | 5 +- .../yardstick/compute/model/NoopCallable.java | 5 +- 101 files changed, 6029 insertions(+), 5118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicReferenceExample.java ---------------------------------------------------------------------- diff --cc examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicReferenceExample.java index 9b5e3d8,0000000..866a426 mode 100644,000000..100644 --- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicReferenceExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicReferenceExample.java @@@ -1,110 -1,0 +1,110 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.examples.datagrid.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Demonstrates a simple usage of distributed atomic reference. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. + * <p> + * Alternatively you can run {@link CacheNodeStartup} in another JVM which will + * start node with {@code examples/config/example-cache.xml} configuration. + */ +public final class IgniteAtomicReferenceExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws Exception If example execution failed. + */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { + System.out.println(); + System.out.println(">>> Atomic reference example started."); + + // Make name of atomic reference. + final String refName = UUID.randomUUID().toString(); + + // Make value of atomic reference. + String val = UUID.randomUUID().toString(); + + // Initialize atomic reference. + IgniteAtomicReference<String> ref = ignite.atomicReference(refName, val, true); + + System.out.println("Atomic reference initial value : " + ref.get() + '.'); + + // Make closure for checking atomic reference value on cluster. - Runnable c = new ReferenceClosure(refName); ++ IgniteRunnable c = new ReferenceClosure(refName); + + // Check atomic reference on all cluster nodes. + ignite.compute().run(c); + + // Make new value of atomic reference. + String newVal = UUID.randomUUID().toString(); + + System.out.println("Try to change value of atomic reference with wrong expected value."); + + ref.compareAndSet("WRONG EXPECTED VALUE", newVal); // Won't change. + + // Check atomic reference on all cluster nodes. + // Atomic reference value shouldn't be changed. + ignite.compute().run(c); + + System.out.println("Try to change value of atomic reference with correct expected value."); + + ref.compareAndSet(val, newVal); + + // Check atomic reference on all cluster nodes. + // Atomic reference value should be changed. + ignite.compute().run(c); + } + + System.out.println(); + System.out.println("Finished atomic reference example..."); + System.out.println("Check all nodes for output (this node is also part of the cluster)."); + } + + /** + * Obtains atomic reference. + */ + private static class ReferenceClosure implements IgniteRunnable { + /** Reference name. */ + private final String refName; + + /** + * @param refName Reference name. + */ + ReferenceClosure(String refName) { + this.refName = refName; + } + + /** {@inheritDoc} */ + @Override public void run() { + IgniteAtomicReference<String> ref = Ignition.ignite().atomicReference(refName, null, true); + + System.out.println("Atomic reference value is " + ref.get() + '.'); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicStampedExample.java ---------------------------------------------------------------------- diff --cc examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicStampedExample.java index ed38282,0000000..14f52b1 mode 100644,000000..100644 --- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicStampedExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicStampedExample.java @@@ -1,117 -1,0 +1,117 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.examples.datagrid.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Demonstrates a simple usage of distributed atomic stamped. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. + * <p> + * Alternatively you can run {@link CacheNodeStartup} in another JVM which will + * start node with {@code examples/config/example-cache.xml} configuration. + */ +public final class IgniteAtomicStampedExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws Exception If example execution failed. + */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { + System.out.println(); + System.out.println(">>> Atomic stamped example started."); + + // Make name of atomic stamped. + String stampedName = UUID.randomUUID().toString(); + + // Make value of atomic stamped. + String val = UUID.randomUUID().toString(); + + // Make stamp of atomic stamped. + String stamp = UUID.randomUUID().toString(); + + // Initialize atomic stamped. + IgniteAtomicStamped<String, String> stamped = ignite.atomicStamped(stampedName, val, stamp, true); + + System.out.println("Atomic stamped initial [value=" + stamped.value() + ", stamp=" + stamped.stamp() + ']'); + + // Make closure for checking atomic stamped. - Runnable c = new StampedUpdateClosure(stampedName); ++ IgniteRunnable c = new StampedUpdateClosure(stampedName); + + // Check atomic stamped on all cluster nodes. + ignite.compute().broadcast(c); + + // Make new value of atomic stamped. + String newVal = UUID.randomUUID().toString(); + + // Make new stamp of atomic stamped. + String newStamp = UUID.randomUUID().toString(); + + System.out.println("Try to change value and stamp of atomic stamped with wrong expected value and stamp."); + + stamped.compareAndSet("WRONG EXPECTED VALUE", newVal, "WRONG EXPECTED STAMP", newStamp); + + // Check atomic stamped on all cluster nodes. + // Atomic stamped value and stamp shouldn't be changed. + ignite.compute().run(c); + + System.out.println("Try to change value and stamp of atomic stamped with correct value and stamp."); + + stamped.compareAndSet(val, newVal, stamp, newStamp); + + // Check atomic stamped on all cluster nodes. + // Atomic stamped value and stamp should be changed. + ignite.compute().run(c); + } + + System.out.println(); + System.out.println("Finished atomic stamped example..."); + System.out.println("Check all nodes for output (this node is also part of the cluster)."); + } + + /** + * Performs update of on an atomic stamped variable in cache. + */ + private static class StampedUpdateClosure implements IgniteRunnable { + /** Atomic stamped variable name. */ + private final String stampedName; + + /** + * @param stampedName Atomic stamped variable name. + */ + StampedUpdateClosure(String stampedName) { + this.stampedName = stampedName; + } + + /** {@inheritDoc} */ + @Override public void run() { + IgniteAtomicStamped<String, String> stamped = Ignition.ignite(). + atomicStamped(stampedName, null, null, true); + + System.out.println("Atomic stamped [value=" + stamped.value() + ", stamp=" + stamped.stamp() + ']'); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/IgniteQueue.java index b15d06c,26fedf7..299993a --- a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java @@@ -66,9 -67,9 +66,9 @@@ import java.util.concurrent.* * Instances of distributed cache queues can be created by calling the following method * on {@link Ignite} API: * <ul> - * <li>{@link Ignite#queue(String, org.apache.ignite.configuration.CollectionConfiguration, int, boolean)}</li> - * <li>{@link Ignite#queue(String, int, IgniteCollectionConfiguration)}</li> ++ * <li>{@link Ignite#queue(String, CollectionConfiguration, int, boolean)}</li> * </ul> - * @see Ignite#queue(String, org.apache.ignite.configuration.CollectionConfiguration, int, boolean) - * @see Ignite#queue(String, int, IgniteCollectionConfiguration) ++ * @see Ignite#queue(String, CollectionConfiguration, int, boolean) */ public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable { /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/IgniteSet.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/IgniteSet.java index 003ce93,1228958..48f27ad --- a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java @@@ -31,7 -34,7 +31,7 @@@ import java.util.* * (governed by {@code collocated} parameter). {@code Non-collocated} mode is provided only * for partitioned caches. If {@code collocated} parameter is {@code true}, then all set items * will be collocated on one node, otherwise items will be distributed through all grid nodes. - * @see Ignite#set(String, org.apache.ignite.configuration.CollectionConfiguration, boolean) - * @see Ignite#set(String, IgniteCollectionConfiguration) ++ * @see Ignite#set(String, CollectionConfiguration, boolean) */ public interface IgniteSet<T> extends Set<T>, Closeable { /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index 0000000,3d60a74..ab21fe1 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@@ -1,0 -1,1567 +1,1567 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.cache.store.jdbc; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.cache.store.*; + import org.apache.ignite.cache.store.jdbc.dialect.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.lifecycle.*; + import org.apache.ignite.resources.*; + import org.apache.ignite.transactions.*; + import org.jetbrains.annotations.*; + + import javax.cache.*; + import javax.cache.integration.*; + import javax.sql.*; + import java.sql.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.locks.*; + + import static java.sql.Statement.*; + + /** + * Base {@link CacheStore} implementation backed by JDBC. This implementation stores objects in underlying database + * using mapping description. + * <p> + * <h2 class="header">Configuration</h2> + * Sections below describe mandatory and optional configuration settings as well + * as providing example using Java and Spring XML. + * <h3>Mandatory</h3> + * There are no mandatory configuration parameters. + * <h3>Optional</h3> + * <ul> + * <li>Data source (see {@link #setDataSource(DataSource)}</li> + * <li>Maximum batch size for writeAll and deleteAll operations. (see {@link #setBatchSize(int)})</li> + * <li>Max workers thread count. These threads are responsible for load cache. (see {@link #setMaxPoolSize(int)})</li> + * <li>Parallel load cache minimum threshold. (see {@link #setParallelLoadCacheMinimumThreshold(int)})</li> + * </ul> + * <h2 class="header">Java Example</h2> + * <pre name="code" class="java"> + * ... + * JdbcPojoCacheStore store = new JdbcPojoCacheStore(); + * ... + * + * </pre> + * <h2 class="header">Spring Example</h2> + * <pre name="code" class="xml"> + * ... + * <bean id="cache.jdbc.store" + * class="org.apache.ignite.cache.store.jdbc.JdbcPojoCacheStore"> + * <property name="connectionUrl" value="jdbc:h2:mem:"/> + * </bean> + * ... + * </pre> + * <p> + * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> + */ + public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> implements LifecycleAware { + /** Max attempt write count. */ + protected static final int MAX_ATTEMPT_WRITE_COUNT = 2; + + /** Default batch size for put and remove operations. */ + protected static final int DFLT_BATCH_SIZE = 512; + + /** Default batch size for put and remove operations. */ + protected static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 512; + + /** Connection attribute property name. */ + protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION"; + + /** Empty column value. */ + protected static final Object[] EMPTY_COLUMN_VALUE = new Object[] { null }; + + /** Auto-injected logger instance. */ - @IgniteLoggerResource ++ @LoggerResource + protected IgniteLogger log; + + /** Lock for metadata cache. */ + @GridToStringExclude + private final Lock cacheMappingsLock = new ReentrantLock(); + + /** Data source. */ + protected DataSource dataSrc; + + /** Cache with entry mapping description. (cache name, (key id, mapping description)). */ + protected volatile Map<String, Map<Object, EntryMapping>> cacheMappings = Collections.emptyMap(); + + /** Database dialect. */ + protected JdbcDialect dialect; + + /** Max workers thread count. These threads are responsible for load cache. */ + private int maxPoolSz = Runtime.getRuntime().availableProcessors(); + + /** Maximum batch size for writeAll and deleteAll operations. */ + private int batchSz = DFLT_BATCH_SIZE; + + /** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */ + private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD; + + /** + * Get field value from object. + * + * @param typeName Type name. + * @param fieldName Field name. + * @param obj Cache object. + * @return Field value from object. + */ + @Nullable protected abstract Object extractField(String typeName, String fieldName, Object obj) + throws CacheException; + + /** + * Construct object from query result. + * + * @param <R> Type of result object. + * @param typeName Type name. + * @param fields Fields descriptors. + * @param loadColIdxs Select query columns index. + * @param rs ResultSet. + * @return Constructed object. + */ + protected abstract <R> R buildObject(String typeName, Collection<CacheTypeFieldMetadata> fields, + Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException; + + /** + * Extract key type id from key object. + * + * @param key Key object. + * @return Key type id. + */ + protected abstract Object keyTypeId(Object key) throws CacheException; + + /** + * Extract key type id from key class name. + * + * @param type String description of key type. + * @return Key type id. + */ + protected abstract Object keyTypeId(String type) throws CacheException; + + /** + * Prepare internal store specific builders for provided types metadata. + * + * @param types Collection of types. + * @throws CacheException If failed to prepare. + */ + protected abstract void prepareBuilders(@Nullable String cacheName, Collection<CacheTypeMetadata> types) + throws CacheException; + + /** + * Perform dialect resolution. + * + * @return The resolved dialect. + * @throws CacheException Indicates problems accessing the metadata. + */ + protected JdbcDialect resolveDialect() throws CacheException { + Connection conn = null; + + String dbProductName = null; + + try { + conn = openConnection(false); + + dbProductName = conn.getMetaData().getDatabaseProductName(); + } + catch (SQLException e) { + throw new CacheException("Failed access to metadata for detect database dialect.", e); + } + finally { + U.closeQuiet(conn); + } + + if ("H2".equals(dbProductName)) + return new H2Dialect(); + + if ("MySQL".equals(dbProductName)) + return new MySQLDialect(); + + if (dbProductName.startsWith("Microsoft SQL Server")) + return new SQLServerDialect(); + + if ("Oracle".equals(dbProductName)) + return new OracleDialect(); + + if (dbProductName.startsWith("DB2/")) + return new DB2Dialect(); + + U.warn(log, "Failed to resolve dialect (BasicJdbcDialect will be used): " + dbProductName); + + return new BasicJdbcDialect(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (dataSrc == null) + throw new IgniteException("Failed to initialize cache store (data source is not provided)."); + + if (dialect == null) + dialect = resolveDialect(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + // No-op. + } + + /** + * Gets connection from a pool. + * + * @param autocommit {@code true} If connection should use autocommit mode. + * @return Pooled connection. + * @throws SQLException In case of error. + */ + protected Connection openConnection(boolean autocommit) throws SQLException { + Connection conn = dataSrc.getConnection(); + + conn.setAutoCommit(autocommit); + + return conn; + } + + /** + * @return Connection. + * @throws SQLException In case of error. + */ + protected Connection connection() throws SQLException { + CacheStoreSession ses = session(); + + if (ses.transaction() != null) { + Map<String, Connection> prop = ses.properties(); + + Connection conn = prop.get(ATTR_CONN_PROP); + + if (conn == null) { + conn = openConnection(false); + + // Store connection in session to used it for other operations in the same session. + prop.put(ATTR_CONN_PROP, conn); + } + + return conn; + } + // Transaction can be null in case of simple load operation. + else + return openConnection(true); + } + + /** + * Closes connection. + * + * @param conn Connection to close. + */ + protected void closeConnection(@Nullable Connection conn) { + CacheStoreSession ses = session(); + + // Close connection right away if there is no transaction. + if (ses.transaction() == null) + U.closeQuiet(conn); + } + + /** + * Closes allocated resources depending on transaction status. + * + * @param conn Allocated connection. + * @param st Created statement, + */ + protected void end(@Nullable Connection conn, @Nullable Statement st) { + U.closeQuiet(st); + + closeConnection(conn); + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) throws CacheWriterException { + CacheStoreSession ses = session(); + + IgniteTx tx = ses.transaction(); + + Connection conn = ses.<String, Connection>properties().remove(ATTR_CONN_PROP); + + if (conn != null) { + assert tx != null; + + try { + if (commit) + conn.commit(); + else + conn.rollback(); + } + catch (SQLException e) { + throw new CacheWriterException( + "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); + } + finally { + U.closeQuiet(conn); + } + } + + if (tx != null && log.isDebugEnabled()) + log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); + } + + /** + * Retrieves the value of the designated column in the current row of this <code>ResultSet</code> object and + * will convert to the requested Java data type. + * + * @param rs Result set. + * @param colIdx Column index in result set. + * @param type Class representing the Java data type to convert the designated column to. + * @return Value in column. + * @throws SQLException If a database access error occurs or this method is called. + */ + protected Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException { + if (type == int.class) + return rs.getInt(colIdx); + + if (type == long.class) + return rs.getLong(colIdx); + + if (type == double.class) + return rs.getDouble(colIdx); + + if (type == boolean.class) + return rs.getBoolean(colIdx); + + if (type == byte.class) + return rs.getByte(colIdx); + + if (type == short.class) + return rs.getShort(colIdx); + + if (type == float.class) + return rs.getFloat(colIdx); + + if (type == Integer.class || type == Long.class || type == Double.class || + type == Byte.class || type == Short.class || type == Float.class) { + Object val = rs.getObject(colIdx); + + if (val != null) { + Number num = (Number)val; + + if (type == Integer.class) + return num.intValue(); + else if (type == Long.class) + return num.longValue(); + else if (type == Double.class) + return num.doubleValue(); + else if (type == Byte.class) + return num.byteValue(); + else if (type == Short.class) + return num.shortValue(); + else if (type == Float.class) + return num.floatValue(); + } + else + return EMPTY_COLUMN_VALUE; + } + + return rs.getObject(colIdx); + } + + /** + * Construct load cache from range. + * + * @param em Type mapping description. + * @param clo Closure that will be applied to loaded values. + * @param lowerBound Lower bound for range. + * @param upperBound Upper bound for range. + * @return Callable for pool submit. + */ + private Callable<Void> loadCacheRange(final EntryMapping em, final IgniteBiInClosure<K, V> clo, + @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound) { + return new Callable<Void>() { + @Override public Void call() throws Exception { + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = openConnection(true); + + stmt = conn.prepareStatement(lowerBound == null && upperBound == null + ? em.loadCacheQry + : em.loadCacheRangeQuery(lowerBound != null, upperBound != null)); + + int ix = 1; + + if (lowerBound != null) + for (int i = lowerBound.length; i > 0; i--) + for (int j = 0; j < i; j++) + stmt.setObject(ix++, lowerBound[j]); + + if (upperBound != null) + for (int i = upperBound.length; i > 0; i--) + for (int j = 0; j < i; j++) + stmt.setObject(ix++, upperBound[j]); + + ResultSet rs = stmt.executeQuery(); + + while (rs.next()) { + K key = buildObject(em.keyType(), em.keyColumns(), em.loadColIdxs, rs); + V val = buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs); + + clo.apply(key, val); + } + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to load cache", e); + } + finally { + U.closeQuiet(stmt); + + U.closeQuiet(conn); + } + + return null; + } + }; + } + + /** + * Construct load cache in one select. + * + * @param m Type mapping description. + * @param clo Closure for loaded values. + * @return Callable for pool submit. + */ + private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K, V> clo) { + return loadCacheRange(m, clo, null, null); + } + + /** + * @return Type mappings for specified cache name. + * @throws CacheException If failed to initialize. + */ + private Map<Object, EntryMapping> cacheMappings(@Nullable String cacheName) throws CacheException { + Map<Object, EntryMapping> entryMappings = cacheMappings.get(cacheName); + + if (entryMappings != null) + return entryMappings; + + cacheMappingsLock.lock(); + + try { + entryMappings = cacheMappings.get(cacheName); + + if (entryMappings != null) + return entryMappings; + + Collection<CacheTypeMetadata> types = ignite().cache(session().cacheName()).configuration() + .getTypeMetadata(); + + entryMappings = U.newHashMap(types.size()); + + for (CacheTypeMetadata type : types) + entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(dialect, type)); + + Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings); + + mappings.put(cacheName, entryMappings); + + prepareBuilders(cacheName, types); + + cacheMappings = mappings; + + return entryMappings; + } + finally { + cacheMappingsLock.unlock(); + } + } + + /** + * @param keyTypeId Key type id. + * @param key Key object. + * @return Entry mapping. + * @throws CacheException if mapping for key was not found. + */ + private EntryMapping entryMapping(Object keyTypeId, Object key) throws CacheException { + String cacheName = session().cacheName(); + + EntryMapping em = cacheMappings(cacheName).get(keyTypeId); + + if (em == null) + throw new CacheException("Failed to find mapping description [key=" + key + + ", cache=" + (cacheName != null ? cacheName : "<default>") + "]"); + + return em; + } + + /** {@inheritDoc} */ + @Override public void loadCache(final IgniteBiInClosure<K, V> clo, @Nullable Object... args) + throws CacheLoaderException { + try { + ExecutorService pool = Executors.newFixedThreadPool(maxPoolSz); + + Collection<Future<?>> futs = new ArrayList<>(); + + if (args != null && args.length > 0) { + if (args.length % 2 != 0) + throw new CacheLoaderException("Expected even number of arguments, but found: " + args.length); + + if (log.isDebugEnabled()) + log.debug("Start loading entries from db using user queries from arguments"); + + for (int i = 0; i < args.length; i += 2) { + String keyType = args[i].toString(); + + String selQry = args[i + 1].toString(); + + EntryMapping em = entryMapping(keyTypeId(keyType), keyType); + + futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo))); + } + } + else { + Collection<EntryMapping> entryMappings = cacheMappings(session().cacheName()).values(); + + if (log.isDebugEnabled()) + log.debug("Start loading all cache types entries from db"); + + for (EntryMapping em : entryMappings) { + if (parallelLoadCacheMinThreshold > 0) { + Connection conn = null; + + try { + conn = connection(); + + PreparedStatement stmt = conn.prepareStatement(em.loadCacheSelRangeQry); + + stmt.setInt(1, parallelLoadCacheMinThreshold); + + ResultSet rs = stmt.executeQuery(); + + if (rs.next()) { + int keyCnt = em.keyCols.size(); + + Object[] upperBound = new Object[keyCnt]; + + for (int i = 0; i < keyCnt; i++) + upperBound[i] = rs.getObject(i + 1); + + futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound))); + + while (rs.next()) { + Object[] lowerBound = upperBound; + + upperBound = new Object[keyCnt]; + + for (int i = 0; i < keyCnt; i++) + upperBound[i] = rs.getObject(i + 1); + + futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound))); + } + + futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null))); + } + else + futs.add(pool.submit(loadCacheFull(em, clo))); + } + catch (SQLException ignored) { + futs.add(pool.submit(loadCacheFull(em, clo))); + } + finally { + U.closeQuiet(conn); + } + } + else + futs.add(pool.submit(loadCacheFull(em, clo))); + } + } + + for (Future<?> fut : futs) + U.get(fut); + } + catch (IgniteCheckedException e) { + throw new CacheLoaderException("Failed to load cache", e.getCause()); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public V load(K key) throws CacheLoaderException { + assert key != null; + + EntryMapping em = entryMapping(keyTypeId(key), key); + + if (log.isDebugEnabled()) + log.debug("Start load value from database [table= " + em.fullTableName() + ", key=" + key + "]"); + + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = connection(); + + stmt = conn.prepareStatement(em.loadQrySingle); + + fillKeyParameters(stmt, em, key); + + ResultSet rs = stmt.executeQuery(); + + if (rs.next()) + return buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs); + } + catch (SQLException e) { + throw new CacheLoaderException("Failed to load object [table=" + em.fullTableName() + + ", key=" + key + "]", e); + } + finally { + end(conn, stmt); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException { + assert keys != null; + + Connection conn = null; + + try { + conn = connection(); + + Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(session().cacheName()).size()); + + Map<K, V> res = new HashMap<>(); + + for (K key : keys) { + Object keyTypeId = keyTypeId(key); + + EntryMapping em = entryMapping(keyTypeId, key); + + LoadWorker<K, V> worker = workers.get(keyTypeId); + + if (worker == null) + workers.put(keyTypeId, worker = new LoadWorker<>(conn, em)); + + worker.keys.add(key); + + if (worker.keys.size() == em.maxKeysPerStmt) + res.putAll(workers.remove(keyTypeId).call()); + } + + for (LoadWorker<K, V> worker : workers.values()) + res.putAll(worker.call()); + + return res; + } + catch (Exception e) { + throw new CacheWriterException("Failed to load entries from database", e); + } + finally { + closeConnection(conn); + } + } + + /** + * @param insStmt Insert statement. + * @param updStmt Update statement. + * @param em Entry mapping. + * @param entry Cache entry. + */ + private void writeUpsert(PreparedStatement insStmt, PreparedStatement updStmt, + EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException { + try { + CacheWriterException we = null; + + for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) { + int paramIdx = fillValueParameters(updStmt, 1, em, entry.getValue()); + + fillKeyParameters(updStmt, paramIdx, em, entry.getKey()); + + if (updStmt.executeUpdate() == 0) { + paramIdx = fillKeyParameters(insStmt, em, entry.getKey()); + + fillValueParameters(insStmt, paramIdx, em, entry.getValue()); + + try { + insStmt.executeUpdate(); + + if (attempt > 0) + U.warn(log, "Entry was inserted in database on second try [table=" + em.fullTableName() + + ", entry=" + entry + "]"); + } + catch (SQLException e) { + String sqlState = e.getSQLState(); + + SQLException nested = e.getNextException(); + + while (sqlState == null && nested != null) { + sqlState = nested.getSQLState(); + + nested = nested.getNextException(); + } + + // The error with code 23505 or 23000 is thrown when trying to insert a row that + // would violate a unique index or primary key. + if ("23505".equals(sqlState) || "23000".equals(sqlState)) { + if (we == null) + we = new CacheWriterException("Failed insert entry in database, violate a unique" + + " index or primary key [table=" + em.fullTableName() + ", entry=" + entry + "]"); + + we.addSuppressed(e); + + U.warn(log, "Failed insert entry in database, violate a unique index or primary key" + + " [table=" + em.fullTableName() + ", entry=" + entry + "]"); + + continue; + } + + throw new CacheWriterException("Failed insert entry in database [table=" + em.fullTableName() + + ", entry=" + entry, e); + } + } + + if (attempt > 0) + U.warn(log, "Entry was updated in database on second try [table=" + em.fullTableName() + + ", entry=" + entry + "]"); + + return; + } + + throw we; + } + catch (SQLException e) { + throw new CacheWriterException("Failed update entry in database [table=" + em.fullTableName() + + ", entry=" + entry + "]", e); + } + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException { + assert entry != null; + + K key = entry.getKey(); + + EntryMapping em = entryMapping(keyTypeId(key), key); + + if (log.isDebugEnabled()) + log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry=" + entry + "]"); + + Connection conn = null; + + try { + conn = connection(); + + if (dialect.hasMerge()) { + PreparedStatement stmt = null; + + try { + stmt = conn.prepareStatement(em.mergeQry); + + int i = fillKeyParameters(stmt, em, key); + + fillValueParameters(stmt, i, em, entry.getValue()); + + int updCnt = stmt.executeUpdate(); + + if (updCnt != 1) + U.warn(log, "Unexpected number of updated entries [table=" + em.fullTableName() + + ", entry=" + entry + "expected=1, actual=" + updCnt + "]"); + } + finally { + U.closeQuiet(stmt); + } + } + else { + PreparedStatement insStmt = null; + + PreparedStatement updStmt = null; + + try { + insStmt = conn.prepareStatement(em.insQry); + + updStmt = conn.prepareStatement(em.updQry); + + writeUpsert(insStmt, updStmt, em, entry); + } + finally { + U.closeQuiet(insStmt); + + U.closeQuiet(updStmt); + } + } + } + catch (SQLException e) { + throw new CacheWriterException("Failed to write entry to database [table=" + em.fullTableName() + + ", entry=" + entry + "]", e); + } + finally { + closeConnection(conn); + } + } + + /** {@inheritDoc} */ + @Override public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries) + throws CacheWriterException { + assert entries != null; + + Connection conn = null; + + try { + conn = connection(); + + Object currKeyTypeId = null; + + if (dialect.hasMerge()) { + PreparedStatement mergeStmt = null; + + try { + EntryMapping em = null; + + LazyValue<Object[]> lazyEntries = new LazyValue<Object[]>() { + @Override public Object[] create() { + return entries.toArray(); + } + }; + + int fromIdx = 0, prepared = 0; + + for (Cache.Entry<? extends K, ? extends V> entry : entries) { + K key = entry.getKey(); + + Object keyTypeId = keyTypeId(key); + + em = entryMapping(keyTypeId, key); + + if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { + if (mergeStmt != null) { + executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); + + U.closeQuiet(mergeStmt); + } + + mergeStmt = conn.prepareStatement(em.mergeQry); + + currKeyTypeId = keyTypeId; + + prepared = 0; + } + + int i = fillKeyParameters(mergeStmt, em, key); + + fillValueParameters(mergeStmt, i, em, entry.getValue()); + + mergeStmt.addBatch(); + + if (++prepared % batchSz == 0) { + executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); + + prepared = 0; + } + } + + if (mergeStmt != null && prepared % batchSz != 0) + executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); + } + finally { + U.closeQuiet(mergeStmt); + } + } + else { + PreparedStatement insStmt = null; + + PreparedStatement updStmt = null; + + try { + for (Cache.Entry<? extends K, ? extends V> entry : entries) { + K key = entry.getKey(); + + Object keyTypeId = keyTypeId(key); + + EntryMapping em = entryMapping(keyTypeId, key); + + if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { + U.closeQuiet(insStmt); + + insStmt = conn.prepareStatement(em.insQry); + + U.closeQuiet(updStmt); + + updStmt = conn.prepareStatement(em.updQry); + + currKeyTypeId = keyTypeId; + } + + writeUpsert(insStmt, updStmt, em, entry); + } + } + finally { + U.closeQuiet(insStmt); + + U.closeQuiet(updStmt); + } + } + } + catch (SQLException e) { + throw new CacheWriterException("Failed to write entries in database", e); + } + finally { + closeConnection(conn); + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + assert key != null; + + EntryMapping em = entryMapping(keyTypeId(key), key); + + if (log.isDebugEnabled()) + log.debug("Start remove value from database [table=" + em.fullTableName() + ", key=" + key + "]"); + + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = connection(); + + stmt = conn.prepareStatement(em.remQry); + + fillKeyParameters(stmt, em, key); + + int delCnt = stmt.executeUpdate(); + + if (delCnt != 1) + U.warn(log, "Unexpected number of deleted entries [table=" + em.fullTableName() + ", key=" + key + + "expected=1, actual=" + delCnt + "]"); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to remove value from database [table=" + em.fullTableName() + + ", key=" + key + "]", e); + } + finally { + end(conn, stmt); + } + } + + /** + * @param em Entry mapping. + * @param stmt Statement. + * @param desc Statement description for error message. + * @param fromIdx Objects in batch start from index. + * @param prepared Expected objects in batch. + * @param lazyObjs All objects used in batch statement as array. + */ + private void executeBatch(EntryMapping em, Statement stmt, String desc, int fromIdx, int prepared, + LazyValue<Object[]> lazyObjs) throws SQLException { + try { + int[] rowCounts = stmt.executeBatch(); + + int numOfRowCnt = rowCounts.length; + + if (numOfRowCnt != prepared) + U.warn(log, "Unexpected number of updated rows [table=" + em.fullTableName() + ", expected=" + prepared + + ", actual=" + numOfRowCnt + "]"); + + for (int i = 0; i < numOfRowCnt; i++) { + int cnt = rowCounts[i]; + + if (cnt != 1 && cnt != SUCCESS_NO_INFO) { + Object[] objs = lazyObjs.value(); + + U.warn(log, "Batch " + desc + " returned unexpected updated row count [table=" + em.fullTableName() + + ", entry=" + objs[fromIdx + i] + ", expected=1, actual=" + cnt + "]"); + } + } + } + catch (BatchUpdateException be) { + int[] rowCounts = be.getUpdateCounts(); + + for (int i = 0; i < rowCounts.length; i++) { + if (rowCounts[i] == EXECUTE_FAILED) { + Object[] objs = lazyObjs.value(); + + U.warn(log, "Batch " + desc + " failed on execution [table=" + em.fullTableName() + + ", entry=" + objs[fromIdx + i] + "]"); + } + } + + throw be; + } + } + + /** {@inheritDoc} */ + @Override public void deleteAll(final Collection<?> keys) throws CacheWriterException { + assert keys != null; + + Connection conn = null; + + try { + conn = connection(); + + Object currKeyTypeId = null; + + EntryMapping em = null; + + PreparedStatement delStmt = null; + + LazyValue<Object[]> lazyKeys = new LazyValue<Object[]>() { + @Override public Object[] create() { + return keys.toArray(); + } + }; + + int fromIdx = 0, prepared = 0; + + for (Object key : keys) { + Object keyTypeId = keyTypeId(key); + + em = entryMapping(keyTypeId, key); + + if (delStmt == null) { + delStmt = conn.prepareStatement(em.remQry); + + currKeyTypeId = keyTypeId; + } + + if (!currKeyTypeId.equals(keyTypeId)) { + executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); + + fromIdx += prepared; + + prepared = 0; + + currKeyTypeId = keyTypeId; + } + + fillKeyParameters(delStmt, em, key); + + delStmt.addBatch(); + + if (++prepared % batchSz == 0) { + executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); + + fromIdx += prepared; + + prepared = 0; + } + } + + if (delStmt != null && prepared % batchSz != 0) + executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to remove values from database", e); + } + finally { + closeConnection(conn); + } + } + + /** + * @param stmt Prepare statement. + * @param i Start index for parameters. + * @param em Entry mapping. + * @param key Key object. + * @return Next index for parameters. + */ + protected int fillKeyParameters(PreparedStatement stmt, int i, EntryMapping em, + Object key) throws CacheException { + for (CacheTypeFieldMetadata field : em.keyColumns()) { + Object fieldVal = extractField(em.keyType(), field.getJavaName(), key); + + try { + if (fieldVal != null) + stmt.setObject(i++, fieldVal); + else + stmt.setNull(i++, field.getDatabaseType()); + } + catch (SQLException e) { + throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseName(), e); + } + } + + return i; + } + + /** + * @param stmt Prepare statement. + * @param m Type mapping description. + * @param key Key object. + * @return Next index for parameters. + */ + protected int fillKeyParameters(PreparedStatement stmt, EntryMapping m, Object key) throws CacheException { + return fillKeyParameters(stmt, 1, m, key); + } + + /** + * @param stmt Prepare statement. + * @param i Start index for parameters. + * @param m Type mapping description. + * @param val Value object. + * @return Next index for parameters. + */ + protected int fillValueParameters(PreparedStatement stmt, int i, EntryMapping m, Object val) + throws CacheWriterException { + for (CacheTypeFieldMetadata field : m.uniqValFields) { + Object fieldVal = extractField(m.valueType(), field.getJavaName(), val); + + try { + if (fieldVal != null) + stmt.setObject(i++, fieldVal); + else + stmt.setNull(i++, field.getDatabaseType()); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to set statement parameter name: " + field.getDatabaseName(), e); + } + } + + return i; + } + + /** + * @return Data source. + */ + public DataSource getDataSource() { + return dataSrc; + } + + /** + * @param dataSrc Data source. + */ + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * Get database dialect. + * + * @return Database dialect. + */ + public JdbcDialect getDialect() { + return dialect; + } + + /** + * Set database dialect. + * + * @param dialect Database dialect. + */ + public void setDialect(JdbcDialect dialect) { + this.dialect = dialect; + } + + /** + * Get Max workers thread count. These threads are responsible for execute query. + * + * @return Max workers thread count. + */ + public int getMaxPoolSize() { + return maxPoolSz; + } + + /** + * Set Max workers thread count. These threads are responsible for execute query. + * + * @param maxPoolSz Max workers thread count. + */ + public void setMaxPoolSize(int maxPoolSz) { + this.maxPoolSz = maxPoolSz; + } + + /** + * Get maximum batch size for delete and delete operations. + * + * @return Maximum batch size. + */ + public int getBatchSize() { + return batchSz; + } + + /** + * Set maximum batch size for write and delete operations. + * + * @param batchSz Maximum batch size. + */ + public void setBatchSize(int batchSz) { + this.batchSz = batchSz; + } + + /** + * Parallel load cache minimum row count threshold. + * + * @return If {@code 0} then load sequentially. + */ + public int getParallelLoadCacheMinimumThreshold() { + return parallelLoadCacheMinThreshold; + } + + /** + * Parallel load cache minimum row count threshold. + * + * @param parallelLoadCacheMinThreshold Minimum row count threshold. If {@code 0} then load sequentially. + */ + public void setParallelLoadCacheMinimumThreshold(int parallelLoadCacheMinThreshold) { + this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold; + } + + /** + * Entry mapping description. + */ + protected static class EntryMapping { + /** Database dialect. */ + private final JdbcDialect dialect; + + /** Select border for range queries. */ + protected final String loadCacheSelRangeQry; + + /** Select all items query. */ + protected final String loadCacheQry; + + /** Select item query. */ + protected final String loadQrySingle; + + /** Select items query. */ + private final String loadQry; + + /** Merge item(s) query. */ + protected final String mergeQry; + + /** Update item query. */ + protected final String insQry; + + /** Update item query. */ + protected final String updQry; + + /** Remove item(s) query. */ + protected final String remQry; + + /** Max key count for load query per statement. */ + protected final int maxKeysPerStmt; + + /** Database key columns. */ + private final Collection<String> keyCols; + + /** Database unique value columns. */ + private final Collection<String> cols; + + /** Select query columns index. */ + private final Map<String, Integer> loadColIdxs; + + /** Unique value fields. */ + private final Collection<CacheTypeFieldMetadata> uniqValFields; + + /** Type metadata. */ + private final CacheTypeMetadata typeMeta; + + /** Full table name. */ + private final String fullTblName; + + /** + * @param typeMeta Type metadata. + */ + public EntryMapping(JdbcDialect dialect, CacheTypeMetadata typeMeta) { + this.dialect = dialect; + + this.typeMeta = typeMeta; + + final Collection<CacheTypeFieldMetadata> keyFields = typeMeta.getKeyFields(); + + Collection<CacheTypeFieldMetadata> valFields = typeMeta.getValueFields(); + + uniqValFields = F.view(valFields, new IgnitePredicate<CacheTypeFieldMetadata>() { + @Override public boolean apply(CacheTypeFieldMetadata col) { + return !keyFields.contains(col); + } + }); + + String schema = typeMeta.getDatabaseSchema(); + + String tblName = typeMeta.getDatabaseTable(); + + fullTblName = F.isEmpty(schema) ? tblName : schema + "." + tblName; + + keyCols = databaseColumns(keyFields); + + Collection<String> uniqValCols = databaseColumns(uniqValFields); + + cols = F.concat(false, keyCols, uniqValCols); + + loadColIdxs = U.newHashMap(cols.size()); + + int idx = 1; + + for (String col : cols) + loadColIdxs.put(col, idx++); + + loadCacheQry = dialect.loadCacheQuery(fullTblName, cols); + + loadCacheSelRangeQry = dialect.loadCacheSelectRangeQuery(fullTblName, keyCols); + + loadQrySingle = dialect.loadQuery(fullTblName, keyCols, cols, 1); + + maxKeysPerStmt = dialect.getMaxParamsCnt() / keyCols.size(); + + loadQry = dialect.loadQuery(fullTblName, keyCols, cols, maxKeysPerStmt); + + insQry = dialect.insertQuery(fullTblName, keyCols, uniqValCols); + + updQry = dialect.updateQuery(fullTblName, keyCols, uniqValCols); + + mergeQry = dialect.mergeQuery(fullTblName, keyCols, uniqValCols); + + remQry = dialect.removeQuery(fullTblName, keyCols); + } + + /** + * Extract database column names from {@link CacheTypeFieldMetadata}. + * + * @param dsc collection of {@link CacheTypeFieldMetadata}. + */ + private static Collection<String> databaseColumns(Collection<CacheTypeFieldMetadata> dsc) { + return F.transform(dsc, new C1<CacheTypeFieldMetadata, String>() { + /** {@inheritDoc} */ + @Override public String apply(CacheTypeFieldMetadata col) { + return col.getDatabaseName(); + } + }); + } + + /** + * Construct query for select values with key count less or equal {@code maxKeysPerStmt} + * + * @param keyCnt Key count. + */ + protected String loadQuery(int keyCnt) { + assert keyCnt <= maxKeysPerStmt; + + if (keyCnt == maxKeysPerStmt) + return loadQry; + + if (keyCnt == 1) + return loadQrySingle; + + return dialect.loadQuery(fullTblName, keyCols, cols, keyCnt); + } + + /** + * Construct query for select values in range. + * + * @param appendLowerBound Need add lower bound for range. + * @param appendUpperBound Need add upper bound for range. + * @return Query with range. + */ + protected String loadCacheRangeQuery(boolean appendLowerBound, boolean appendUpperBound) { + return dialect.loadCacheRangeQuery(fullTblName, keyCols, cols, appendLowerBound, appendUpperBound); + } + + /** Key type. */ + protected String keyType() { + return typeMeta.getKeyType(); + } + + /** Value type. */ + protected String valueType() { + return typeMeta.getValueType(); + } + + /** + * Gets key columns. + * + * @return Key columns. + */ + protected Collection<CacheTypeFieldMetadata> keyColumns() { + return typeMeta.getKeyFields(); + } + + /** + * Gets value columns. + * + * @return Value columns. + */ + protected Collection<CacheTypeFieldMetadata> valueColumns() { + return typeMeta.getValueFields(); + } + + /** + * Get full table name. + * + * @return <schema>.<table name> + */ + protected String fullTableName() { + return fullTblName; + } + } + + /** + * Worker for load cache using custom user query. + * + * @param <K1> Key type. + * @param <V1> Value type. + */ + private class LoadCacheCustomQueryWorker<K1, V1> implements Callable<Void> { + /** Entry mapping description. */ + private final EntryMapping em; + + /** User query. */ + private final String qry; + + /** Closure for loaded values. */ + private final IgniteBiInClosure<K1, V1> clo; + + /** + * @param em Entry mapping description. + * @param qry User query. + * @param clo Closure for loaded values. + */ + private LoadCacheCustomQueryWorker(EntryMapping em, String qry, IgniteBiInClosure<K1, V1> clo) { + this.em = em; + this.qry = qry; + this.clo = clo; + } + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = openConnection(true); + + stmt = conn.prepareStatement(qry); + + ResultSet rs = stmt.executeQuery(); + + ResultSetMetaData meta = rs.getMetaData(); + + Map<String, Integer> colIdxs = U.newHashMap(meta.getColumnCount()); + + for (int i = 1; i <= meta.getColumnCount(); i++) + colIdxs.put(meta.getColumnLabel(i), i); + + while (rs.next()) { + K1 key = buildObject(em.keyType(), em.keyColumns(), colIdxs, rs); + V1 val = buildObject(em.valueType(), em.valueColumns(), colIdxs, rs); + + clo.apply(key, val); + } + + return null; + } + catch (SQLException e) { + throw new CacheLoaderException("Failed to execute custom query for load cache", e); + } + finally { + U.closeQuiet(stmt); + + U.closeQuiet(conn); + } + } + } + + /** + * Lazy initialization of value. + * + * @param <T> Cached object type + */ + private abstract static class LazyValue<T> { + /** Cached value. */ + private T val; + + /** + * @return Construct value. + */ + protected abstract T create(); + + /** + * @return Value. + */ + public T value() { + if (val == null) + val = create(); + + return val; + } + } + + /** + * Worker for load by keys. + * + * @param <K1> Key type. + * @param <V1> Value type. + */ + private class LoadWorker<K1, V1> implements Callable<Map<K1, V1>> { + /** Connection. */ + private final Connection conn; + + /** Keys for load. */ + private final Collection<K1> keys; + + /** Entry mapping description. */ + private final EntryMapping em; + + /** + * @param conn Connection. + * @param em Entry mapping description. + */ + private LoadWorker(Connection conn, EntryMapping em) { + this.conn = conn; + this.em = em; + + keys = new ArrayList<>(em.maxKeysPerStmt); + } + + /** {@inheritDoc} */ + @Override public Map<K1, V1> call() throws Exception { + PreparedStatement stmt = null; + + try { + stmt = conn.prepareStatement(em.loadQuery(keys.size())); + + int i = 1; + + for (Object key : keys) + for (CacheTypeFieldMetadata field : em.keyColumns()) { + Object fieldVal = extractField(em.keyType(), field.getJavaName(), key); + + if (fieldVal != null) + stmt.setObject(i++, fieldVal); + else + stmt.setNull(i++, field.getDatabaseType()); + } + + ResultSet rs = stmt.executeQuery(); + + Map<K1, V1> entries = U.newHashMap(keys.size()); + + while (rs.next()) { + K1 key = buildObject(em.keyType(), em.keyColumns(), em.loadColIdxs, rs); + V1 val = buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs); + + entries.put(key, val); + } + + return entries; + } + finally { + U.closeQuiet(stmt); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ----------------------------------------------------------------------