Repository: incubator-ignite Updated Branches: refs/heads/ignite-494b 2340f5f26 -> 4329e3918
ignite-341 - fix attempt Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2c871643 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2c871643 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2c871643 Branch: refs/heads/ignite-494b Commit: 2c8716430db5f4461c34974b9266a21ca90c0372 Parents: 6837e12 Author: S.Vladykin <svlady...@gridgain.com> Authored: Mon Mar 23 04:30:53 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Mon Mar 23 04:30:53 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLocalPartition.java | 111 ++++++- .../ignite/internal/util/lang/GridFunc.java | 14 + ...CacheOffheapTieredMultithreadedSelfTest.java | 324 +++++++++++++++++++ 3 files changed, 434 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c871643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 92c62d7..66f555c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; @@ -32,6 +33,7 @@ import org.apache.ignite.lang.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -420,7 +422,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> * @return Future for evict attempt. */ private IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) { - if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { + if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) && + state.compareAndSet(RENTING, EVICTED, 0, 0)) { if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); @@ -460,7 +463,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); - clearSwap(); + if (!GridQueryProcessor.isEnabled(cctx.config())) + clearSwap(); if (cctx.isDrEnabled()) cctx.dr().partitionEvicted(id); @@ -484,6 +488,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> */ private void clearSwap() { assert state() == EVICTED; + assert !GridQueryProcessor.isEnabled(cctx.config()) : "Indexing needs to have unswapped values."; try { GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it = cctx.swap().iterator(id); @@ -536,27 +541,103 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> boolean rec = cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); - for (Iterator<GridDhtCacheEntry> it = map.values().iterator(); it.hasNext();) { - GridDhtCacheEntry cached = it.next(); + Iterator<GridDhtCacheEntry> it = map.values().iterator(); + + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> swapIt = null; + + if (swap && GridQueryProcessor.isEnabled(cctx.config())) { // Indexing needs to unswap cache values. + Iterator<GridDhtCacheEntry> unswapIt = null; try { - if (cached.clearInternal(clearVer, swap)) { - it.remove(); + swapIt = cctx.swap().iterator(id); + unswapIt = unswapIterator(swapIt); + } + catch (Exception e) { + U.error(log, "Failed to clear swap for evicted partition: " + this, e); + } + + if (unswapIt != null) + it = F.concat(it, unswapIt); + } + + try { + while (it.hasNext()) { + GridDhtCacheEntry cached = it.next(); - if (!cached.isInternal()) { - mapPubSize.decrement(); + try { + if (cached.clearInternal(clearVer, swap)) { + it.remove(); - if (rec) - cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), (IgniteUuid)null, - null, EVT_CACHE_REBALANCE_OBJECT_UNLOADED, null, false, cached.rawGet(), - cached.hasValue(), null, null, null); + if (!cached.isInternal()) { + mapPubSize.decrement(); + + if (rec) + cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), + (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_UNLOADED, null, false, + cached.rawGet(), cached.hasValue(), null, null, null); + } } } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); + catch (IgniteCheckedException e) { + U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); + } } } + finally { + U.close(swapIt, log); + } + } + + /** + * @param it Swap iterator. + * @return Unswapping iterator over swapped entries. + */ + private Iterator<GridDhtCacheEntry> unswapIterator( + final GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it) { + if (it == null) + return null; + + return new Iterator<GridDhtCacheEntry>() { + /** */ + KeyCacheObject lastKey; + + @Override public boolean hasNext() { + return it.hasNext(); + } + + @Override public GridDhtCacheEntry next() { + Map.Entry<byte[], GridCacheSwapEntry> entry = it.next(); + + byte[] keyBytes = entry.getKey(); + + try { + lastKey = cctx.toCacheKeyObject(keyBytes); + + GridDhtCacheEntry res = (GridDhtCacheEntry)cctx.cache().entryEx(lastKey, false); + + res.unswap(true, true); + + return res; + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + + @Override public void remove() { + if (lastKey == null) + throw new IllegalStateException(); + + map.remove(lastKey); + + try { + cctx.swap().remove(lastKey); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to remove swap entry for key: " + lastKey); + } + } + }; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c871643/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 4cbb40c..d5b95af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -1924,6 +1924,20 @@ public class GridFunc { /** * Concatenates multiple iterators as single one. * + * @param iters Iterators. + * @return Single iterator. + */ + @SuppressWarnings("unchecked") + public static <T> Iterator<T> concat(Iterator<T> ... iters) { + if (iters.length == 1) + return iters[0]; + + return concat(asList(iters).iterator()); + } + + /** + * Concatenates multiple iterators as single one. + * * @param iters Iterator over iterators. * @return Single iterator. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c871643/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java new file mode 100644 index 0000000..24dd89e --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Based on Yardstick benchmark. + */ +public class IgniteCacheOffheapTieredMultithreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int RANGE = 1_000_000; + + /** */ + private static IgniteCache<Integer, Object> cache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration<?,?> cacheCfg = new CacheConfiguration<>(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAtomicityMode(ATOMIC); + cacheCfg.setSwapEnabled(false); + cacheCfg.setBackups(1); + cacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + cacheCfg.setIndexedTypes( + Integer.class, Person.class + ); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + + cache = grid(0).jcache(null); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000; + } + + /** + * @throws Exception If failed. + */ + public void testQueryPut() throws Exception { + final AtomicBoolean end = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while(!end.get()) { + if (rnd.nextInt(5) == 0) { + double salary = rnd.nextDouble() * RANGE * 1000; + + double maxSalary = salary + 1000; + + Collection<Cache.Entry<Integer, Object>> entries = executeQuery(salary, maxSalary); + + for (Cache.Entry<Integer, Object> entry : entries) { + Person p = (Person)entry.getValue(); + + if (p.getSalary() < salary || p.getSalary() > maxSalary) + throw new Exception("Invalid person retrieved [min=" + salary + ", max=" + maxSalary + + ", person=" + p + ']'); + } + } + else { + int i = rnd.nextInt(RANGE); + + cache.put(i, new Person(i, "firstName" + i, "lastName" + i, i * 1000)); + } + } + + return null; + } + }, 5); + + Thread.sleep(5 * 60 * 1000); + + end.set(true); + + fut.get(); + } + + + + /** + * @param minSalary Min salary. + * @param maxSalary Max salary. + * @return Query result. + * @throws Exception If failed. + */ + private Collection<Cache.Entry<Integer, Object>> executeQuery(double minSalary, double maxSalary) throws Exception { + SqlQuery qry = new SqlQuery(Person.class, "salary >= ? and salary <= ?"); + + qry.setArgs(minSalary, maxSalary); + + return cache.query(qry).getAll(); + } + + /** + * Person record used for query test. + */ + public static class Person implements Externalizable { + /** Person ID. */ + @QuerySqlField(index = true) + private int id; + + /** Organization ID. */ + @QuerySqlField(index = true) + private int orgId; + + /** First name (not-indexed). */ + @QuerySqlField + private String firstName; + + /** Last name (not indexed). */ + @QuerySqlField + private String lastName; + + /** Salary. */ + @QuerySqlField(index = true) + private double salary; + + /** + * Constructs empty person. + */ + public Person() { + // No-op. + } + + /** + * Constructs person record that is not linked to any organization. + * + * @param id Person ID. + * @param firstName First name. + * @param lastName Last name. + * @param salary Salary. + */ + public Person(int id, String firstName, String lastName, double salary) { + this(id, 0, firstName, lastName, salary); + } + + /** + * Constructs person record. + * + * @param id Person ID. + * @param orgId Organization ID. + * @param firstName First name. + * @param lastName Last name. + * @param salary Salary. + */ + public Person(int id, int orgId, String firstName, String lastName, double salary) { + this.id = id; + this.orgId = orgId; + this.firstName = firstName; + this.lastName = lastName; + this.salary = salary; + } + + /** + * @return Person id. + */ + public int getId() { + return id; + } + + /** + * @param id Person id. + */ + public void setId(int id) { + this.id = id; + } + + /** + * @return Organization id. + */ + public int getOrganizationId() { + return orgId; + } + + /** + * @param orgId Organization id. + */ + public void setOrganizationId(int orgId) { + this.orgId = orgId; + } + + /** + * @return Person first name. + */ + public String getFirstName() { + return firstName; + } + + /** + * @param firstName Person first name. + */ + public void setFirstName(String firstName) { + this.firstName = firstName; + } + + /** + * @return Person last name. + */ + public String getLastName() { + return lastName; + } + + /** + * @param lastName Person last name. + */ + public void setLastName(String lastName) { + this.lastName = lastName; + } + + /** + * @return Salary. + */ + public double getSalary() { + return salary; + } + + /** + * @param salary Salary. + */ + public void setSalary(double salary) { + this.salary = salary; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(id); + out.writeInt(orgId); + out.writeUTF(firstName); + out.writeUTF(lastName); + out.writeDouble(salary); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = in.readInt(); + orgId = in.readInt(); + firstName = in.readUTF(); + lastName = in.readUTF(); + salary = in.readDouble(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || (o instanceof Person) && id == ((Person)o).id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "Person [firstName=" + firstName + + ", id=" + id + + ", orgId=" + orgId + + ", lastName=" + lastName + + ", salary=" + salary + + ']'; + } + } +}