http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 64d1246,0000000..7380e27 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@@ -1,595 -1,0 +1,595 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.internal.util.*; - import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.query.*; ++import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*; + +/** + * Cache set implementation. + */ +public class GridCacheSetImpl<T> extends AbstractCollection<T> implements IgniteSet<T> { + /** */ + private static final int BATCH_SIZE = 100; + + /** Cache context. */ + private final GridCacheContext ctx; + + /** Cache. */ + private final GridCache<GridCacheSetItemKey, Boolean> cache; + + /** Logger. */ + private final IgniteLogger log; + + /** Set name. */ + private final String name; + + /** Set unique ID. */ + private final IgniteUuid id; + + /** Collocation flag. */ + private final boolean collocated; + + /** Queue header partition. */ + private final int hdrPart; + + /** Removed flag. */ + private volatile boolean rmvd; + + /** + * @param ctx Cache context. + * @param name Set name. + * @param hdr Set header. + */ + @SuppressWarnings("unchecked") + public GridCacheSetImpl(GridCacheContext ctx, String name, GridCacheSetHeader hdr) { + this.ctx = ctx; + this.name = name; + id = hdr.id(); + collocated = hdr.collocated(); + + cache = ctx.cache(); + + log = ctx.logger(GridCacheSetImpl.class); + + hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name)); + } + + /** {@inheritDoc} */ + @Override public boolean collocated() { + return collocated; + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return rmvd; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public int size() { + try { + onAccess(); + + if (ctx.isLocal() || ctx.isReplicated()) { + GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id); + + return set != null ? set.size() : 0; + } + + CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, null, + new GridSetQueryPredicate<>(id, collocated), false, false); + + Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); + + qry.projection(ctx.grid().forNodes(nodes)); + + Iterable<Integer> col = (Iterable<Integer>)qry.execute(new SumReducer()).get(); + + int sum = 0; + + for (Integer val : col) + sum += val; + + return sum; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean isEmpty() { + onAccess(); + + GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id); + + return (set == null || set.isEmpty()) && size() == 0; + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + onAccess(); + + final GridCacheSetItemKey key = itemKey(o); + + return retry(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return cache.get(key) != null; + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean add(T o) { + onAccess(); + + final GridCacheSetItemKey key = itemKey(o); + + return retry(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return cache.putxIfAbsent(key, true); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + onAccess(); + + final GridCacheSetItemKey key = itemKey(o); + + return retry(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return cache.removex(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean containsAll(Collection<?> c) { + for (Object obj : c) { + if (!contains(obj)) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean addAll(Collection<? extends T> c) { + onAccess(); + + boolean add = false; + + Map<GridCacheSetItemKey, Boolean> addKeys = null; + + for (T obj : c) { + if (add) { + if (addKeys == null) + addKeys = U.newHashMap(BATCH_SIZE); + + addKeys.put(itemKey(obj), true); + + if (addKeys.size() == BATCH_SIZE) { + retryPutAll(addKeys); + + addKeys.clear(); + } + } + else + add |= add(obj); + } + + if (!F.isEmpty(addKeys)) + retryPutAll(addKeys); + + return add; + } + + /** {@inheritDoc} */ + @Override public boolean removeAll(Collection<?> c) { + onAccess(); + + boolean rmv = false; + + Set<GridCacheSetItemKey> rmvKeys = null; + + for (Object obj : c) { + if (rmv) { + if (rmvKeys == null) + rmvKeys = U.newHashSet(BATCH_SIZE); + + rmvKeys.add(itemKey(obj)); + + if (rmvKeys.size() == BATCH_SIZE) { + retryRemoveAll(rmvKeys); + + rmvKeys.clear(); + } + } + else + rmv |= remove(obj); + } + + if (!F.isEmpty(rmvKeys)) + retryRemoveAll(rmvKeys); + + return rmv; + } + + /** {@inheritDoc} */ + @Override public boolean retainAll(Collection<?> c) { + try { + onAccess(); + + try (GridCloseableIterator<T> iter = iterator0()) { + boolean rmv = false; + + Set<GridCacheSetItemKey> rmvKeys = null; + + for (T val : iter) { + if (!c.contains(val)) { + rmv = true; + + if (rmvKeys == null) + rmvKeys = U.newHashSet(BATCH_SIZE); + + rmvKeys.add(itemKey(val)); + + if (rmvKeys.size() == BATCH_SIZE) { + retryRemoveAll(rmvKeys); + + rmvKeys.clear(); + } + } + } + + if (!F.isEmpty(rmvKeys)) + retryRemoveAll(rmvKeys); + + return rmv; + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void clear() { + try { + onAccess(); + + try (GridCloseableIterator<T> iter = iterator0()) { + Collection<GridCacheSetItemKey> rmvKeys = new ArrayList<>(BATCH_SIZE); + + for (T val : iter) { + rmvKeys.add(itemKey(val)); + + if (rmvKeys.size() == BATCH_SIZE) { + retryRemoveAll(rmvKeys); + + rmvKeys.clear(); + } + } + + if (!rmvKeys.isEmpty()) + retryRemoveAll(rmvKeys); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Iterator<T> iterator() { + onAccess(); + + return iterator0(); + } + + /** {@inheritDoc} */ + @Override public void close() { + try { + if (rmvd) + return; + + ctx.dataStructures().removeSet(name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + private GridCloseableIterator<T> iterator0() { + try { + CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, null, + new GridSetQueryPredicate<>(id, collocated), false, false); + + Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); + + qry.projection(ctx.grid().forNodes(nodes)); + + CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute(); + + CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = + ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, Map.Entry<T, ?>>() { + @Override protected T convert(Map.Entry<T, ?> e) { + return e.getKey(); + } + + @Override protected void remove(T item) { + GridCacheSetImpl.this.remove(item); + } + }); + + if (rmvd) { + ctx.itHolder().removeIterator(it); + + checkRemoved(); + } + + return it; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @param call Callable. + * @return Callable result. + */ + private <R> R retry(Callable<R> call) { + try { + return CacheDataStructuresProcessor.retry(log, call); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @param keys Keys to remove. + */ + private void retryRemoveAll(final Collection<GridCacheSetItemKey> keys) { + retry(new Callable<Void>() { + @Override public Void call() throws Exception { + cache.removeAll(keys); + + return null; + } + }); + } + + /** + * @param keys Keys to remove. + */ + private void retryPutAll(final Map<GridCacheSetItemKey, Boolean> keys) { + retry(new Callable<Void>() { + @Override public Void call() throws Exception { + cache.putAll(keys); + + return null; + } + }); + } + + /** + * @param topVer Topology version. + * @return Nodes where set data request should be sent. + * @throws IgniteCheckedException If all cache nodes left grid. + */ + @SuppressWarnings("unchecked") + private Collection<ClusterNode> dataNodes(long topVer) throws IgniteCheckedException { + if (ctx.isLocal() || ctx.isReplicated()) + return Collections.singleton(ctx.localNode()); + + Collection<ClusterNode> nodes; + + if (collocated) { + List<ClusterNode> nodes0 = ctx.affinity().nodes(hdrPart, topVer); + + nodes = !nodes0.isEmpty() ? + Collections.singleton(nodes0.contains(ctx.localNode()) ? ctx.localNode() : F.first(nodes0)) : nodes0; + } + else + nodes = CU.affinityNodes(ctx, topVer); + + if (nodes.isEmpty()) + throw new IgniteCheckedException("Failed to get set data, all cache nodes left grid."); + + return nodes; + } + + /** + * @param rmvd Removed flag. + */ + void removed(boolean rmvd) { + if (this.rmvd) + return; + + this.rmvd = rmvd; + + if (rmvd) + ctx.itHolder().clearQueries(); + } + + /** + * Throws {@link org.apache.ignite.cache.datastructures.DataStructureRemovedException} if set was removed. + */ + private void checkRemoved() { + if (rmvd) + throw new DataStructureRemovedException("Set has been removed from cache: " + this); + } + + /** + * Checks if set was removed and handles iterators weak reference queue. + */ + private void onAccess() { + ctx.itHolder().checkWeakQueue(); + + checkRemoved(); + } + + /** + * @return Set ID. + */ + IgniteUuid id() { + return id; + } + + /** + * @return Cache context. + */ + GridCacheContext context() { + return ctx; + } + + /** + * @param item Set item. + * @return Item key. + */ + private GridCacheSetItemKey itemKey(Object item) { + return collocated ? new CollocatedItemKey(name, id, item) : new GridCacheSetItemKey(id, item); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheSetImpl.class, this); + } + + /** + * + */ + private static class SumReducer implements IgniteReducer<Object, Integer>, Externalizable { + /** */ + private static final long serialVersionUID = -3436987759126521204L; + + /** */ + private int cntr; + + /** + * Required by {@link Externalizable}. + */ + public SumReducer() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean collect(@Nullable Object o) { + cntr++; + + return true; + } + + /** {@inheritDoc} */ + @Override public Integer reduce() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + } + + /** + * Item key for collocated set. + */ + private static class CollocatedItemKey extends GridCacheSetItemKey { + /** */ + private static final long serialVersionUID = -1400701398705953750L; + + /** */ + private String setName; + + /** + * Required by {@link Externalizable}. + */ + public CollocatedItemKey() { + // No-op. + } + + /** + * @param setName Set name. + * @param setId Set unique ID. + * @param item Set item. + */ + private CollocatedItemKey(String setName, IgniteUuid setId, Object item) { + super(setId, item); + + this.setName = setName; + } + + /** + * @return Item affinity key. + */ + @CacheAffinityKeyMapped + public Object affinityKey() { + return setName; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + U.writeString(out, setName); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + setName = U.readString(in); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java index 6cfa70c,0000000..390b6df mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java @@@ -1,109 -1,0 +1,109 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.lang.*; + +import java.io.*; + +/** + * Set item key. + */ +public class GridCacheSetItemKey implements GridCacheInternal, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteUuid setId; + + /** */ + @GridToStringInclude + private Object item; + + /** + * Required by {@link Externalizable}. + */ + public GridCacheSetItemKey() { + // No-op. + } + + /** + * @param setId Set unique ID. + * @param item Set item. + */ + GridCacheSetItemKey(IgniteUuid setId, Object item) { + this.setId = setId; + this.item = item; + } + + /** + * @return Set UUID. + */ + public IgniteUuid setId() { + return setId; + } + + /** + * @return Set item. + */ + public Object item() { + return item; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = setId.hashCode(); + + result = 31 * result + item.hashCode(); + + return result; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + GridCacheSetItemKey that = (GridCacheSetItemKey)o; + + return setId.equals(that.setId) && item.equals(that.item); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, setId); + out.writeObject(item); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + setId = U.readGridUuid(in); + item = in.readObject(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheSetItemKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index 819d6d3,0000000..8fec999 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@@ -1,563 -1,0 +1,563 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.*; - import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Cache set proxy. + */ +public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Deserialization stash. */ + private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash = + new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() { + @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() { + return F.t2(); + } + }; + + /** Delegate set. */ + private GridCacheSetImpl<T> delegate; + + /** Cache context. */ + private GridCacheContext cctx; + + /** Cache gateway. */ + private GridCacheGateway gate; + + /** Busy lock. */ + private GridSpinBusyLock busyLock; + + /** + * Required by {@link Externalizable}. + */ + public GridCacheSetProxy() { + // No-op. + } + + /** + * @param cctx Cache context. + * @param delegate Delegate set. + */ + public GridCacheSetProxy(GridCacheContext cctx, GridCacheSetImpl<T> delegate) { + this.cctx = cctx; + this.delegate = delegate; + + gate = cctx.gate(); + + busyLock = new GridSpinBusyLock(); + } + + /** + * Remove callback. + */ + public void blockOnRemove() { + delegate.removed(true); + + busyLock.block(); + } + + /** {@inheritDoc} */ + @Override public int size() { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Integer>() { + @Override public Integer call() throws Exception { + return delegate.size(); + } + }, cctx); + + return delegate.size(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return delegate.isEmpty(); + } + }, cctx); + + return delegate.isEmpty(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean contains(final Object o) { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return delegate.contains(o); + } + }, cctx); + + return delegate.contains(o); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @NotNull @Override public Object[] toArray() { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Object[]>() { + @Override public Object[] call() throws Exception { + return delegate.toArray(); + } + }, cctx); + + return delegate.toArray(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @NotNull @Override public <T1> T1[] toArray(final T1[] a) { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<T1[]>() { + @Override public T1[] call() throws Exception { + return delegate.toArray(a); + } + }, cctx); + + return delegate.toArray(a); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean add(final T t) { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return delegate.add(t); + } + }, cctx); + + return delegate.add(t); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean remove(final Object o) { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return delegate.remove(o); + } + }, cctx); + + return delegate.remove(o); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean containsAll(final Collection<?> c) { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return delegate.containsAll(c); + } + }, cctx); + + return delegate.containsAll(c); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean addAll(final Collection<? extends T> c) { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return delegate.addAll(c); + } + }, cctx); + + return delegate.addAll(c); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean retainAll(final Collection<?> c) { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return delegate.retainAll(c); + } + }, cctx); + + return delegate.retainAll(c); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean removeAll(final Collection<?> c) { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return delegate.removeAll(c); + } + }, cctx); + + return delegate.removeAll(c); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void clear() { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) { + CU.outTx(new Callable<Void>() { + @Override public Void call() throws Exception { + delegate.clear(); + + return null; + } + }, cctx); + } + else + delegate.clear(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public Iterator<T> iterator() { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) + return CU.outTx(new Callable<Iterator<T>>() { + @Override public Iterator<T> call() throws Exception { + return delegate.iterator(); + } + }, cctx); + + return delegate.iterator(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void close() { + gate.enter(); + + try { + if (cctx.transactional()) { + CU.outTx(new Callable<Void>() { + @Override public Void call() throws Exception { + delegate.close(); + + return null; + } + }, cctx); + } + else + delegate.close(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + + /** {@inheritDoc} */ + @Override public String name() { + return delegate.name(); + } + + /** {@inheritDoc} */ + @Override public boolean collocated() throws IgniteCheckedException { + return delegate.collocated(); + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return delegate.removed(); + } + + /** + * Enters busy state. + */ + private void enterBusy() { + if (!busyLock.enterBusy()) + throw new DataStructureRemovedException("Set has been removed from cache: " + delegate); + } + + /** + * Leaves busy state. + */ + private void leaveBusy() { + busyLock.leaveBusy(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(cctx.kernalContext()); + U.writeString(out, name()); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + t.set1((GridKernalContext)in.readObject()); + t.set2(U.readString(in)); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + protected Object readResolve() throws ObjectStreamException { + try { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + return t.get1().dataStructures().set(t.get2(), null, false); + } + catch (IgniteCheckedException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return delegate.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridSetQueryPredicate.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridSetQueryPredicate.java index 037ff3d,0000000..7ae5974 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridSetQueryPredicate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridSetQueryPredicate.java @@@ -1,116 -1,0 +1,116 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.lang.*; + +import java.io.*; + +/** + * Predicate for query over {@link IgniteSet} items. + */ +public class GridSetQueryPredicate<K, V> implements IgniteBiPredicate<K, V>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteUuid setId; + + /** */ + private boolean collocated; + + /** */ + private GridCacheContext ctx; + + /** */ + private boolean filter; + + /** + * Required by {@link Externalizable}. + */ + public GridSetQueryPredicate() { + // No-op. + } + + /** + * @param setId Set ID. + * @param collocated Collocation flag. + */ + public GridSetQueryPredicate(IgniteUuid setId, boolean collocated) { + this.setId = setId; + this.collocated = collocated; + } + + /** + * @param ctx Cache context. + */ + public void init(GridCacheContext ctx) { + this.ctx = ctx; + + filter = filterKeys(); + } + + /** + * + * @return Collocation flag. + */ + public boolean collocated() { + return collocated; + } + + /** + * @return Set ID. + */ + public IgniteUuid setId() { + return setId; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean apply(K k, V v) { + return !filter || ctx.affinity().primary(ctx.localNode(), k, ctx.affinity().affinityTopologyVersion()); + } + + /** + * @return {@code True} if need to filter out non-primary keys during processing of set data query. + */ + private boolean filterKeys() { + return !collocated && !(ctx.isLocal() || ctx.isReplicated()) && + (ctx.config().getBackups() > 0 || CU.isNearEnabled(ctx)); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, setId); + out.writeBoolean(collocated); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + setId = U.readGridUuid(in); + collocated = in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridSetQueryPredicate.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index c2b0e8c,0000000..923c32b mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@@ -1,262 -1,0 +1,240 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; - import org.apache.ignite.cluster.*; ++import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.transactions.*; ++import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + - import javax.cache.*; +import java.util.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * {@link org.apache.ignite.IgniteQueue} implementation using transactional cache. + */ +public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { - /** */ - private final IgniteTransactions txs; - + /** + * @param queueName Queue name. + * @param hdr Queue header. + * @param cctx Cache context. + */ + public GridTransactionalCacheQueueImpl(String queueName, GridCacheQueueHeader hdr, GridCacheContext<?, ?> cctx) { + super(queueName, hdr, cctx); - - txs = cctx.kernalContext().grid().transactions(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean offer(final T item) throws IgniteException { + A.notNull(item, "item"); + + try { + boolean retVal; + + int cnt = 0; + + while (true) { + try { - try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)); ++ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { ++ Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get(); + + if (idx != null) { + checkRemoved(idx); + + cache.put(itemKey(idx), item); + + retVal = true; + } + else + retVal = false; + + tx.commit(); + + break; + } + } - catch (CacheException e) { - if (e.getCause() instanceof ClusterGroupEmptyException) ++ catch (ClusterTopologyCheckedException e) { ++ if (e instanceof ClusterGroupEmptyCheckedException) + throw e; + - if (e.getCause() instanceof ClusterTopologyException) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); ++ if (cnt++ == MAX_UPDATE_RETRIES) ++ throw e; ++ else { ++ U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + - U.sleep(RETRY_DELAY); - } ++ U.sleep(RETRY_DELAY); + } - else - throw e; + } + } + + return retVal; + } + catch (IgniteCheckedException e) { - throw new IgniteException(e); ++ throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public T poll() throws IgniteException { + try { + int cnt = 0; + + T retVal; + + while (true) { - try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)); ++ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { ++ Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get(); + + if (idx != null) { + checkRemoved(idx); + - retVal = (T)cache.getAndRemove(itemKey(idx)); ++ retVal = (T)cache.remove(itemKey(idx), null); + + assert retVal != null; + } + else + retVal = null; + + tx.commit(); + + break; + } - catch (CacheException e) { - if (e.getCause() instanceof ClusterGroupEmptyException) ++ catch (ClusterTopologyCheckedException e) { ++ if (e instanceof ClusterGroupEmptyCheckedException) + throw e; + - if (e.getCause() instanceof ClusterTopologyException) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); ++ if (cnt++ == MAX_UPDATE_RETRIES) ++ throw e; ++ else { ++ U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + - U.sleep(RETRY_DELAY); - } ++ U.sleep(RETRY_DELAY); + } - else - throw e; + } + } + + return retVal; + } + catch (IgniteCheckedException e) { - throw new IgniteException(e); ++ throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean addAll(final Collection<? extends T> items) { + A.notNull(items, "items"); + + try { + boolean retVal; + + int cnt = 0; + + while (true) { - try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())); ++ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { ++ Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get(); + + if (idx != null) { + checkRemoved(idx); + + Map<GridCacheQueueItemKey, T> putMap = new HashMap<>(); + + for (T item : items) { + putMap.put(itemKey(idx), item); + + idx++; + } + - cache.putAll(putMap); ++ cache.putAll(putMap, null); + + retVal = true; + } + else + retVal = false; + + tx.commit(); + + break; + } - catch (CacheException e) { - if (e.getCause() instanceof ClusterGroupEmptyException) ++ catch (ClusterTopologyCheckedException e) { ++ if (e instanceof ClusterGroupEmptyCheckedException) + throw e; + - if (e.getCause() instanceof ClusterTopologyException) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); ++ if (cnt++ == MAX_UPDATE_RETRIES) ++ throw e; ++ else { ++ U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + - U.sleep(RETRY_DELAY); - } ++ U.sleep(RETRY_DELAY); + } - else - throw e; + } + } + + return retVal; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void removeItem(final long rmvIdx) throws IgniteCheckedException { + try { + int cnt = 0; + + while (true) { - try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)); ++ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { ++ Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get(); + + if (idx != null) { + checkRemoved(idx); + - boolean rmv = cache.remove(itemKey(idx)); ++ boolean rmv = cache.removex(itemKey(idx)); + + assert rmv; + } + + tx.commit(); + + break; + } - catch (CacheException e) { - if (e.getCause() instanceof ClusterGroupEmptyException) ++ catch (ClusterTopologyCheckedException e) { ++ if (e instanceof ClusterGroupEmptyCheckedException) + throw e; + - if (e.getCause() instanceof ClusterTopologyException) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); ++ if (cnt++ == MAX_UPDATE_RETRIES) ++ throw e; ++ else { ++ U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + - U.sleep(RETRY_DELAY); - } ++ U.sleep(RETRY_DELAY); + } - else - throw e; + } + } + } + catch (IgniteCheckedException e) { - throw new IgniteException(e); ++ throw U.convertException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryInternalKeysSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReferenceCleanupSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index c8b45fc,bf25087..fe3bc54 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@@ -19,11 -19,13 +19,12 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; + import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; - import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.testframework.*; import java.util.*; @@@ -130,9 -120,11 +131,10 @@@ public abstract class GridCacheAbstract * @throws Exception If failed. */ public void testAtomicLongConstantTopologyChange() throws Exception { - try { - CacheAtomicLong s = cache().dataStructures().atomicLong(STRUCTURE_NAME, 1, true); - + try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { ++ @Override ++ public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { String name = UUID.randomUUID().toString(); @@@ -175,8 -170,10 +177,8 @@@ * @throws Exception If failed. */ public void testAtomicLongConstantMultipleTopologyChange() throws Exception { - try { - CacheAtomicLong s = cache().dataStructures().atomicLong(STRUCTURE_NAME, 1, true); - + try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { @@@ -244,9 -249,11 +246,10 @@@ * @throws Exception If failed. */ public void testAtomicReferenceConstantTopologyChange() throws Exception { - try { - CacheAtomicReference<Integer> s = cache().dataStructures().atomicReference(STRUCTURE_NAME, 1, true); - + try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { ++ @Override ++ public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { String name = UUID.randomUUID().toString(); @@@ -287,8 -298,10 +290,8 @@@ * @throws Exception If failed. */ public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception { - try { - CacheAtomicReference<Integer> s = cache().dataStructures().atomicReference(STRUCTURE_NAME, 1, true); - + try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { @@@ -360,9 -384,11 +363,10 @@@ * @throws Exception If failed. */ public void testAtomicStampedConstantTopologyChange() throws Exception { - try { - CacheAtomicStamped<Integer, Integer> s = cache().dataStructures().atomicStamped(STRUCTURE_NAME, 1, 1, true); - + try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { ++ @Override ++ public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { String name = UUID.randomUUID().toString(); @@@ -416,8 -447,10 +420,8 @@@ * @throws Exception If failed. */ public void testAtomicStampedConstantMultipleTopologyChange() throws Exception { - try { - CacheAtomicStamped<Integer, Integer> s = cache().dataStructures().atomicStamped(STRUCTURE_NAME, 1, 1, true); - + try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { @@@ -501,49 -541,54 +505,49 @@@ * @throws Exception If failed. */ public void testCountDownLatchConstantTopologyChange() throws Exception { - try { - CacheCountDownLatch s = cache().dataStructures().countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, - false, true); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); + try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { + try { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { ++ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + String name = UUID.randomUUID().toString(); + + try { + Ignite g = startGrid(name); - assert g.cache(null).dataStructures().countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, - false, false) != null; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); + assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; + } + finally { + if (i != TOP_CHANGE_CNT - 1) + stopGrid(name); + } } } + catch (Exception e) { + throw F.wrap(e); + } } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.count(); + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - while (!fut.isDone()) { - assert s.count() == val; + int val = s.count(); - assert s.countDown() == val - 1; + while (!fut.isDone()) { + assert s.count() == val; - val--; - } + assert s.countDown() == val - 1; - fut.get(); + val--; + } - for (Ignite g : G.allGrids()) - assert g.cache(null).dataStructures().countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true) - .count() == val; - } - finally { - cache().dataStructures().countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll(); + fut.get(); - cache().dataStructures().removeCountDownLatch(STRUCTURE_NAME); + for (Ignite g : G.allGrids()) + assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count() == val; + } + finally { + grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll(); + } } } @@@ -551,56 -596,61 +555,56 @@@ * @throws Exception If failed. */ public void testCountDownLatchConstantMultipleTopologyChange() throws Exception { - try { - CacheCountDownLatch s = cache().dataStructures() - .countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); + try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { + try { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { ++ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + Collection<String> names = new GridLeanSet<>(3); - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + try { + for (int j = 0; j < 3; j++) { + String name = UUID.randomUUID().toString(); - names.add(name); + names.add(name); - Ignite g = startGrid(name); + Ignite g = startGrid(name); - assert g.cache(null).dataStructures() - .countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; + assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; + } + } + finally { + if (i != TOP_CHANGE_CNT - 1) + for (String name : names) + stopGrid(name); } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); } } + catch (Exception e) { + throw F.wrap(e); + } } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - int val = s.count(); - - while (!fut.isDone()) { - assert s.count() == val; + int val = s.count(); - assert s.countDown() == val - 1; + while (!fut.isDone()) { + assert s.count() == val; - val--; - } + assert s.countDown() == val - 1; - fut.get(); + val--; + } - for (Ignite g : G.allGrids()) - assert g.cache(null).dataStructures() - .countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count() == val; - } - finally { - cache().dataStructures().countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll(); + fut.get(); - cache().dataStructures().removeCountDownLatch(STRUCTURE_NAME); + for (Ignite g : G.allGrids()) + assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count()); + } + finally { + grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll(); + } } } @@@ -629,11 -679,13 +633,11 @@@ /** * @throws Exception If failed. */ - public void testFifoQueueConstantTopologyChange() throws Exception { - try { - CacheQueue<Integer> s = cache().dataStructures().queue(STRUCTURE_NAME, 0, false, true); - + public void testQueueConstantTopologyChange() throws Exception { + try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, collectionConfiguration(), 0, true)) { s.put(1); - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { @@@ -673,11 -729,13 +677,11 @@@ /** * @throws Exception If failed. */ - public void testFifoQueueConstantMultipleTopologyChange() throws Exception { - try { - CacheQueue<Integer> s = cache().dataStructures().queue(STRUCTURE_NAME, 0, false, true); - + public void testQueueConstantMultipleTopologyChange() throws Exception { + try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, collectionConfiguration(), 0, true)) { s.put(1); - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { @@@ -740,8 -807,10 +744,8 @@@ * @throws Exception If failed. */ public void testAtomicSequenceConstantTopologyChange() throws Exception { - try { - CacheAtomicSequence s = cache().dataStructures().atomicSequence(STRUCTURE_NAME, 1, true); - + try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { String name = UUID.randomUUID().toString(); @@@ -835,8 -909,10 +839,8 @@@ * @throws Exception If failed. */ public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception { - try { - CacheAtomicSequence s = cache().dataStructures().atomicSequence(STRUCTURE_NAME, 1, true); - + try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { - IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java index f0739bf,30bae26..629be11 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java @@@ -22,8 -21,9 +22,7 @@@ import org.apache.ignite.cache.* import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.datastructures.*; - import org.apache.ignite.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; @@@ -160,9 -172,9 +159,9 @@@ public abstract class GridCacheAbstract final AtomicBoolean stop = new AtomicBoolean(); - IgniteFuture<?> fut = startNodeKiller(stop, new AtomicInteger(), killIdxs); + IgniteInternalFuture<?> fut = startNodeKiller(stop, new AtomicInteger(), killIdxs); - final int ITEMS = (atomicityMode() == ATOMIC) ? 10_000 : 3000; + final int ITEMS = (collectionCacheAtomicityMode() == ATOMIC) ? 10_000 : 3000; try { for (int i = 0; i < ITEMS; i++) { @@@ -323,8 -331,7 +322,8 @@@ * @param killIdxs Indexes of nodes to kill. * @return Future completing when thread finishes. */ - private IgniteFuture<?> startNodeKiller(final AtomicBoolean stop, - private IgniteInternalFuture<?> startNodeKiller(final AtomicBoolean stop, final AtomicInteger killCnt, ++ private IgniteInternalFuture<?> startNodeKiller(final AtomicBoolean stop, + final AtomicInteger killCnt, final List<Integer> killIdxs) { return GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { @@@ -352,17 -359,14 +351,17 @@@ } /** + * @param queue Queue. * @return Primary node for queue's header. */ - private int primaryQueueNode() { - CacheAffinity<Object> aff = grid(0).cache(null).affinity(); + private int primaryQueueNode(IgniteQueue queue) { + GridCacheContext cctx = GridTestUtils.getFieldValue(queue, "cctx"); + + GridCacheAffinityManager aff = cctx.affinity(); for (int i = 0; i < gridCount(); i++) { - for (GridCacheEntryEx e : ((GridKernal)grid(i)).context().cache().internalCache(cctx.name()).map().allEntries0()) { - for (GridCacheEntryEx e : ((IgniteKernal)grid(i)).context().cache().internalCache().map().allEntries0()) { - if (aff.isPrimary(grid(i).localNode(), e.key()) && e.key() instanceof GridCacheQueueHeaderKey) ++ for (GridCacheEntryEx e : ((IgniteKernal)grid(i)).context().cache().internalCache(cctx.name()).map().allEntries0()) { + if (aff.primary(grid(i).localNode(), e.key(), -1) && e.key() instanceof GridCacheQueueHeaderKey) return i; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheMultiNodeDataStructureTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java index 6a003bd,412f2e4..5661c00 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java @@@ -19,12 -19,10 +19,11 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.datastructures.*; - import org.apache.ignite.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.testframework.*; @@@ -178,10 -177,8 +177,10 @@@ public class GridCacheQueueCleanupSelfT int cnt = 0; for (int i = 0; i < gridCount(); i++) { - Iterator<GridCacheEntryEx<Object, Object>> entries = - ((IgniteKernal)grid(i)).context().cache().internalCache().map().allEntries0().iterator(); + GridCacheAdapter<Object, Object> cache = - ((GridKernal)grid(i)).context().cache().internalCache(queueCacheName); ++ ((IgniteKernal)grid(i)).context().cache().internalCache(queueCacheName); + + Iterator<GridCacheEntryEx<Object, Object>> entries = cache.map().allEntries0().iterator(); while (entries.hasNext()) { cnt++; @@@ -210,10 -207,10 +209,10 @@@ * @param queueName Queue name. * @return Future completing when thread finishes. */ - private IgniteFuture<?> startAddPollThread(final Ignite ignite, final AtomicBoolean stop, final String queueName) { + private IgniteInternalFuture<?> startAddPollThread(final Ignite ignite, final AtomicBoolean stop, final String queueName) { return GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { - CacheQueue<Integer> queue = ignite.cache(null).dataStructures().queue(queueName, 0, false, true); + IgniteQueue<Integer> queue = ignite.queue(queueName, collectionConfiguration(), 0, true); assertEquals(0, queue.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java index ac01d07,95a84b9..74f0892 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java @@@ -18,7 -18,11 +18,10 @@@ package org.apache.ignite.internal.processors.cache.datastructures; import org.apache.ignite.*; -import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.discovery.tcp.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java index 517c76a,84eee83..bc7a037 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java @@@ -18,21 -18,21 +18,16 @@@ package org.apache.ignite.internal.processors.cache.datastructures; import org.apache.ignite.*; --import org.apache.ignite.cache.*; -import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; - import org.apache.ignite.internal.processors.datastructures.*; - import org.apache.ignite.lang.*; - import org.apache.ignite.marshaller.optimized.*; - import org.apache.ignite.resources.*; - import org.apache.ignite.spi.discovery.tcp.*; - import org.apache.ignite.spi.discovery.tcp.ipfinder.*; - import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; ++import org.apache.ignite.marshaller.optimized.*; + import org.apache.ignite.resources.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; --import org.apache.ignite.testframework.junits.common.*; import java.io.*; import java.util.*; @@@ -150,14 -178,16 +145,14 @@@ public abstract class GridCacheQueueMul final Ignite g = startGrid(GRID_CNT + 1); - IgniteFuture<Object> fut1 = GridTestUtils.runAsync(new Callable<Object>() { + IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { info(">>> Executing put callable [node=" + g.cluster().localNode().id() + - ", thread=" + Thread.currentThread().getName() + ", aff=" + - F.nodeId8s(g.cache(null).affinity().mapKeyToPrimaryAndBackups( - new GridCacheInternalKeyImpl(queueName))) + ']'); + ", thread=" + Thread.currentThread().getName() + ']'); - CacheQueue<Integer> q = g.cache(null).dataStructures().queue(queueName, 5, true, true); + IgniteQueue<Integer> q = g.queue(queueName, collocatedCollectionConfiguration(), 5, true); - assert q.isEmpty(); + assertTrue(q.isEmpty()); for (int i = 0; i < ITEMS_CNT; i++) { if (i == q.capacity()) { @@@ -351,10 -385,12 +346,10 @@@ }, THREADS_PER_NODE, "testPutMultiNode")); } - for (IgniteFuture fut : futs) + for (IgniteInternalFuture fut : futs) fut.get(); - GridCache cache = grid(0).cache(null); - - CacheQueue<Integer> queue = cache.dataStructures().queue(queueName, 0, collocated, true); + IgniteQueue<Integer> queue = grid(0).queue(queueName, null, 0, false); assertEquals(THREADS_PER_NODE * ITEMS_PER_THREAD * GRID_CNT, queue.size()); @@@ -452,14 -485,12 +447,14 @@@ stopPoll.set(true); - for (IgniteFuture fut : pollFuts) + for (IgniteInternalFuture fut : pollFuts) fut.get(); - GridCache cache = grid(0).cache(null); + IgniteCollectionConfiguration colCfg = collectionConfiguration(); + + colCfg.setCollocated(collocated); - CacheQueue<Integer> queue = cache.dataStructures().queue(queueName, 0, collocated, true); + IgniteQueue<Integer> queue = grid(0).queue(queueName, colCfg, 0, true); int[] resItems = new int[ITEMS_PER_THREAD]; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueRotativeMultiNodeAbstractTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueRotativeMultiNodeAbstractTest.java index 87de89c,44568eb..847ffe1 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueRotativeMultiNodeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueRotativeMultiNodeAbstractTest.java @@@ -20,13 -20,16 +20,13 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.resources.*; - import org.apache.ignite.internal.util.typedef.*; - import org.apache.ignite.internal.util.typedef.internal.*; - import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; import java.util.*; import java.util.concurrent.*; @@@ -157,9 -164,9 +157,9 @@@ public abstract class GridCacheQueueRot Thread th = new Thread(new Runnable() { @Override public void run() { try { - assert grid(1).compute().call(new TakeJob(queueName)); + assert grid(1).compute().call(new TakeJob(queueName, collocatedCollectionConfiguration())); } - catch (IgniteCheckedException e) { + catch (IgniteException e) { error(e.getMessage(), e); } }