Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-494b 2340f5f26 -> 4329e3918


ignite-341 - fix attempt


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2c871643
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2c871643
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2c871643

Branch: refs/heads/ignite-494b
Commit: 2c8716430db5f4461c34974b9266a21ca90c0372
Parents: 6837e12
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Mon Mar 23 04:30:53 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Mon Mar 23 04:30:53 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLocalPartition.java  | 111 ++++++-
 .../ignite/internal/util/lang/GridFunc.java     |  14 +
 ...CacheOffheapTieredMultithreadedSelfTest.java | 324 +++++++++++++++++++
 3 files changed, 434 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c871643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 92c62d7..66f555c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -32,6 +33,7 @@ import org.apache.ignite.lang.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -420,7 +422,8 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>
      * @return Future for evict attempt.
      */
     private IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
-        if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+        if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
+            state.compareAndSet(RENTING, EVICTED, 0, 0)) {
             if (log.isDebugEnabled())
                 log.debug("Evicted partition: " + this);
 
@@ -460,7 +463,8 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>
             if (log.isDebugEnabled())
                 log.debug("Evicted partition: " + this);
 
-            clearSwap();
+            if (!GridQueryProcessor.isEnabled(cctx.config()))
+                clearSwap();
 
             if (cctx.isDrEnabled())
                 cctx.dr().partitionEvicted(id);
@@ -484,6 +488,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>
      */
     private void clearSwap() {
         assert state() == EVICTED;
+        assert !GridQueryProcessor.isEnabled(cctx.config()) : "Indexing needs 
to have unswapped values.";
 
         try {
             GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it = 
cctx.swap().iterator(id);
@@ -536,27 +541,103 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>
 
         boolean rec = 
cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
 
-        for (Iterator<GridDhtCacheEntry> it = map.values().iterator(); 
it.hasNext();) {
-            GridDhtCacheEntry cached = it.next();
+        Iterator<GridDhtCacheEntry> it = map.values().iterator();
+
+        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> swapIt = 
null;
+
+        if (swap && GridQueryProcessor.isEnabled(cctx.config())) { // Indexing 
needs to unswap cache values.
+            Iterator<GridDhtCacheEntry> unswapIt = null;
 
             try {
-                if (cached.clearInternal(clearVer, swap)) {
-                    it.remove();
+                swapIt = cctx.swap().iterator(id);
+                unswapIt = unswapIterator(swapIt);
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to clear swap for evicted partition: " + 
this, e);
+            }
+
+            if (unswapIt != null)
+                it = F.concat(it, unswapIt);
+        }
+
+        try {
+            while (it.hasNext()) {
+                GridDhtCacheEntry cached = it.next();
 
-                    if (!cached.isInternal()) {
-                        mapPubSize.decrement();
+                try {
+                    if (cached.clearInternal(clearVer, swap)) {
+                        it.remove();
 
-                        if (rec)
-                            cctx.events().addEvent(cached.partition(), 
cached.key(), cctx.localNodeId(), (IgniteUuid)null,
-                                null, EVT_CACHE_REBALANCE_OBJECT_UNLOADED, 
null, false, cached.rawGet(),
-                                cached.hasValue(), null, null, null);
+                        if (!cached.isInternal()) {
+                            mapPubSize.decrement();
+
+                            if (rec)
+                                cctx.events().addEvent(cached.partition(), 
cached.key(), cctx.localNodeId(),
+                                    (IgniteUuid)null, null, 
EVT_CACHE_REBALANCE_OBJECT_UNLOADED, null, false,
+                                    cached.rawGet(), cached.hasValue(), null, 
null, null);
+                        }
                     }
                 }
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to clear cache entry for evicted 
partition: " + cached, e);
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to clear cache entry for evicted 
partition: " + cached, e);
+                }
             }
         }
+        finally {
+            U.close(swapIt, log);
+        }
+    }
+
+    /**
+     * @param it Swap iterator.
+     * @return Unswapping iterator over swapped entries.
+     */
+    private Iterator<GridDhtCacheEntry> unswapIterator(
+        final GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it) 
{
+        if (it == null)
+            return null;
+
+        return new Iterator<GridDhtCacheEntry>() {
+            /** */
+            KeyCacheObject lastKey;
+
+            @Override public boolean hasNext() {
+                return it.hasNext();
+            }
+
+            @Override public GridDhtCacheEntry next() {
+                Map.Entry<byte[], GridCacheSwapEntry> entry = it.next();
+
+                byte[] keyBytes = entry.getKey();
+
+                try {
+                    lastKey = cctx.toCacheKeyObject(keyBytes);
+
+                    GridDhtCacheEntry res = 
(GridDhtCacheEntry)cctx.cache().entryEx(lastKey, false);
+
+                    res.unswap(true, true);
+
+                    return res;
+                }
+                catch (IgniteCheckedException e) {
+                    throw new CacheException(e);
+                }
+            }
+
+            @Override public void remove() {
+                if (lastKey == null)
+                    throw new IllegalStateException();
+
+                map.remove(lastKey);
+
+                try {
+                    cctx.swap().remove(lastKey);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to remove swap entry for key: " + 
lastKey);
+                }
+            }
+        };
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c871643/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 4cbb40c..d5b95af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -1924,6 +1924,20 @@ public class GridFunc {
     /**
      * Concatenates multiple iterators as single one.
      *
+     * @param iters Iterators.
+     * @return Single iterator.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Iterator<T> concat(Iterator<T> ... iters) {
+        if (iters.length == 1)
+            return iters[0];
+
+        return concat(asList(iters).iterator());
+    }
+
+    /**
+     * Concatenates multiple iterators as single one.
+     *
      * @param iters Iterator over iterators.
      * @return Single iterator.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c871643/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java
new file mode 100644
index 0000000..24dd89e
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Based on Yardstick benchmark.
+ */
+public class IgniteCacheOffheapTieredMultithreadedSelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int RANGE = 1_000_000;
+
+    /** */
+    private static IgniteCache<Integer, Object> cache;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        CacheConfiguration<?,?> cacheCfg = new CacheConfiguration<>();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setAtomicityMode(ATOMIC);
+        cacheCfg.setSwapEnabled(false);
+        cacheCfg.setBackups(1);
+        cacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+        cacheCfg.setIndexedTypes(
+            Integer.class, Person.class
+        );
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(3);
+
+        cache = grid(0).jcache(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryPut() throws Exception {
+        final AtomicBoolean end = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                while(!end.get()) {
+                    if (rnd.nextInt(5) == 0) {
+                        double salary = rnd.nextDouble() * RANGE * 1000;
+
+                        double maxSalary = salary + 1000;
+
+                        Collection<Cache.Entry<Integer, Object>> entries = 
executeQuery(salary, maxSalary);
+
+                        for (Cache.Entry<Integer, Object> entry : entries) {
+                            Person p = (Person)entry.getValue();
+
+                            if (p.getSalary() < salary || p.getSalary() > 
maxSalary)
+                                throw new Exception("Invalid person retrieved 
[min=" + salary + ", max=" + maxSalary +
+                                    ", person=" + p + ']');
+                        }
+                    }
+                    else {
+                        int i = rnd.nextInt(RANGE);
+
+                        cache.put(i, new Person(i, "firstName" + i, "lastName" 
+ i, i * 1000));
+                    }
+                }
+
+                return null;
+            }
+        }, 5);
+
+        Thread.sleep(5 * 60 * 1000);
+
+        end.set(true);
+
+        fut.get();
+    }
+
+
+
+    /**
+     * @param minSalary Min salary.
+     * @param maxSalary Max salary.
+     * @return Query result.
+     * @throws Exception If failed.
+     */
+    private Collection<Cache.Entry<Integer, Object>> executeQuery(double 
minSalary, double maxSalary) throws Exception {
+        SqlQuery qry = new SqlQuery(Person.class, "salary >= ? and salary <= 
?");
+
+        qry.setArgs(minSalary, maxSalary);
+
+        return cache.query(qry).getAll();
+    }
+
+    /**
+     * Person record used for query test.
+     */
+    public static class Person implements Externalizable {
+        /** Person ID. */
+        @QuerySqlField(index = true)
+        private int id;
+
+        /** Organization ID. */
+        @QuerySqlField(index = true)
+        private int orgId;
+
+        /** First name (not-indexed). */
+        @QuerySqlField
+        private String firstName;
+
+        /** Last name (not indexed). */
+        @QuerySqlField
+        private String lastName;
+
+        /** Salary. */
+        @QuerySqlField(index = true)
+        private double salary;
+
+        /**
+         * Constructs empty person.
+         */
+        public Person() {
+            // No-op.
+        }
+
+        /**
+         * Constructs person record that is not linked to any organization.
+         *
+         * @param id Person ID.
+         * @param firstName First name.
+         * @param lastName Last name.
+         * @param salary Salary.
+         */
+        public Person(int id, String firstName, String lastName, double 
salary) {
+            this(id, 0, firstName, lastName, salary);
+        }
+
+        /**
+         * Constructs person record.
+         *
+         * @param id Person ID.
+         * @param orgId Organization ID.
+         * @param firstName First name.
+         * @param lastName Last name.
+         * @param salary Salary.
+         */
+        public Person(int id, int orgId, String firstName, String lastName, 
double salary) {
+            this.id = id;
+            this.orgId = orgId;
+            this.firstName = firstName;
+            this.lastName = lastName;
+            this.salary = salary;
+        }
+
+        /**
+         * @return Person id.
+         */
+        public int getId() {
+            return id;
+        }
+
+        /**
+         * @param id Person id.
+         */
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        /**
+         * @return Organization id.
+         */
+        public int getOrganizationId() {
+            return orgId;
+        }
+
+        /**
+         * @param orgId Organization id.
+         */
+        public void setOrganizationId(int orgId) {
+            this.orgId = orgId;
+        }
+
+        /**
+         * @return Person first name.
+         */
+        public String getFirstName() {
+            return firstName;
+        }
+
+        /**
+         * @param firstName Person first name.
+         */
+        public void setFirstName(String firstName) {
+            this.firstName = firstName;
+        }
+
+        /**
+         * @return Person last name.
+         */
+        public String getLastName() {
+            return lastName;
+        }
+
+        /**
+         * @param lastName Person last name.
+         */
+        public void setLastName(String lastName) {
+            this.lastName = lastName;
+        }
+
+        /**
+         * @return Salary.
+         */
+        public double getSalary() {
+            return salary;
+        }
+
+        /**
+         * @param salary Salary.
+         */
+        public void setSalary(double salary) {
+            this.salary = salary;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            out.writeInt(id);
+            out.writeInt(orgId);
+            out.writeUTF(firstName);
+            out.writeUTF(lastName);
+            out.writeDouble(salary);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            id = in.readInt();
+            orgId = in.readInt();
+            firstName = in.readUTF();
+            lastName = in.readUTF();
+            salary = in.readDouble();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || (o instanceof Person) && id == ((Person)o).id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "Person [firstName=" + firstName +
+                ", id=" + id +
+                ", orgId=" + orgId +
+                ", lastName=" + lastName +
+                ", salary=" + salary +
+                ']';
+        }
+    }
+}

Reply via email to