http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdateSync.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdateSync.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdateSync.java deleted file mode 100644 index 838f9af..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdateSync.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.index; - -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Streamer index update synchronizer. - * <p> - * Used in {@link StreamerIndexProvider} to synchronize - * operations on window index. - * - * @see StreamerIndexProvider - * - */ -public class StreamerIndexUpdateSync { - /** */ - private volatile int res; - - /** - * Waits for a notification from another thread, which - * should call {@link #finish(int)} with an operation result. - * That result is returned by this method. - * - * @return Operation results, passed to {@link #finish(int)}. - * @throws InterruptedException If wait was interrupted. - */ - public int await() throws InterruptedException { - int res0 = res; - - if (res0 == 0) { - synchronized (this) { - while ((res0 = res) == 0) - wait(); - } - } - - assert res0 != 0; - - return res0; - } - - /** - * Notifies all waiting threads to finish waiting. - * - * @param res Operation result to return from {@link #await()}. - */ - public void finish(int res) { - assert res != 0; - - synchronized (this) { - this.res = res; - - notifyAll(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StreamerIndexUpdateSync.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdater.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdater.java deleted file mode 100644 index ebb3a97..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdater.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.index; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -/** - * Index updater. The main responsibility of index updater is to maintain index values - * up to date whenever events are added or removed from window. - * <p> - * Updater is provided to index provider in configuration usually via - * {@link StreamerIndexProviderAdapter#setUpdater(StreamerIndexUpdater)} method. - */ -public interface StreamerIndexUpdater<E, K, V> { - /** - * Given an event, extract index key. For example, if you have a 'Person' object - * with field 'age' and need to index based on this field, then this method - * should return the value of age field. - * <p> - * If {@code null} is returned then event will be ignored by the index. - * - * @param evt Event being added or removed from the window. - * @return Index key for this event. - */ - @Nullable public K indexKey(E evt); - - /** - * Gets initial value for the index or {@code null} if event should be ignored. - * This method is called every time when an entry is added to the window in - * order to get initial value for given key. - * - * @param evt Event being added to or removed from window. - * @param key Index key return by {@link #indexKey(Object)} method. - * @return Initial value for given key, if {@code null} then event will be - * ignored and index entry will not be created. - */ - @Nullable public V initialValue(E evt, K key); - - /** - * Callback invoked whenever an event is being added to the window. Given a key and - * a current index value for this key, the implementation should return the new - * value for this key. If returned value is {@code null}, then current entry will - * be removed from the index. - * <p> - * If index is sorted, then sorting happens based on the returned value. - * - * @param entry Current index entry. - * @param evt New event. - * @return New index value for given key, if {@code null}, then current - * index entry will be removed the index. - * @throws IgniteException If entry should not be added to index (e.g. if uniqueness is violated). - */ - @Nullable public V onAdded(StreamerIndexEntry<E, K, V> entry, E evt) throws IgniteException; - - /** - * Callback invoked whenever an event is being removed from the window and has - * index entry for given key. If there was no entry for given key, then - * {@code onRemoved()} will not be called. - * <p> - * Given a key and a current index value for this key, the implementation should return the new - * value for this key. If returned value is {@code null}, then current entry will - * be removed from the index. - * <p> - * If index is sorted, then sorting happens based on the returned value. - * - * @param entry Current index entry. - * @param evt Event being removed from the window. - * @return New index value for given key, if {@code null}, then current - * index entry will be removed the index. - */ - @Nullable public V onRemoved(StreamerIndexEntry<E, K, V> entry, E evt); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/StreamerHashIndexProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/StreamerHashIndexProvider.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/StreamerHashIndexProvider.java deleted file mode 100644 index 74427c4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/StreamerHashIndexProvider.java +++ /dev/null @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.index.hash; - -import com.romix.scala.collection.concurrent.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.index.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.streamer.index.StreamerIndexPolicy.*; - -/** - * Hash index implementation of a {@link org.apache.ignite.streamer.index.StreamerIndexProvider}. - * <p> - * This implementation uses a concurrent hash map, which is extremely - * efficient for CRUD operations. It does not, however, maintain the - * ordering of entries, so, operations which imply ordering are not - * supported. - * <p> - * If ordering is required, consider using {@link org.apache.ignite.streamer.index.tree.StreamerTreeIndexProvider}. - * - * @see org.apache.ignite.streamer.index.tree.StreamerTreeIndexProvider - * - */ -public class StreamerHashIndexProvider<E, K, V> extends StreamerIndexProviderAdapter<E, K, V> { - /** */ - private TrieMap<K, Entry<E, K, V>> key2Entry; - - /** */ - private final ThreadLocal<State<E, K, V>> state = new ThreadLocal<>(); - - /** {@inheritDoc} */ - @Override protected StreamerIndex<E, K, V> index0() { - return new Index<>(); - } - - /** {@inheritDoc} */ - @Override public void initialize() { - key2Entry = new TrieMap<>(); - } - - /** {@inheritDoc} */ - @Override public void reset0() { - // This will recreate maps. - initialize(); - } - - /** {@inheritDoc} */ - @Override protected void add(E evt, K key, StreamerIndexUpdateSync sync) { - State<E, K, V> state0 = state.get(); - - if (state0 != null) - throw new IllegalStateException("Previous operation has not been finished: " + state0); - - Entry<E, K, V> oldEntry = trieGet(key, key2Entry); - - StreamerIndexUpdater<E, K, V> updater = getUpdater(); - - if (oldEntry == null) { - V val = updater.initialValue(evt, key); - - if (val == null) - return; // Ignore event. - - state0 = new State<>(null, null, false); - - state.set(state0); - - Entry<E, K, V> newEntry = newEntry(key, val, null, evt); - - // Save new entry to state. - state0.newEntry(newEntry); - - // Put new entry. - Entry<E, K, V> rmv = key2Entry.put(key, newEntry); - - assert rmv == null; - - // Update passed. - state0.finished(true); - } - else { - if (isUnique()) - throw new IgniteException("Index unique key violation [evt=" + evt + ", key=" + key + ']'); - - V val = updater.onAdded(oldEntry, evt); - - if (val == null) { - remove(evt, key, sync); - - return; - } - - state0 = new State<>(oldEntry, null, false); - - state.set(state0); - - Entry<E, K, V> newEntry = addEvent(oldEntry, key, val, null, evt); - - // Save new entry to state. - state0.newEntry(newEntry); - - // Replace former entry with the new one. - Entry<E, K, V> rmv = key2Entry.put(key, newEntry); - - assert rmv != null; - - // Update passed. - state0.finished(true); - } - } - - /** {@inheritDoc} */ - @Override protected void remove(E evt, K key, StreamerIndexUpdateSync sync) { - State<E, K, V> state0 = state.get(); - - if (state0 != null) - throw new IllegalStateException("Previous operation has not been finished: " + state0); - - Entry<E, K, V> oldEntry = trieGet(key, key2Entry); - - if (oldEntry == null) - return; - - StreamerIndexUpdater<E, K, V> updater = getUpdater(); - - V val = updater.onRemoved(oldEntry, evt); - - if (val == null) { - state0 = new State<>(oldEntry, null, false); - - state.set(state0); - - boolean b = key2Entry.remove(key, oldEntry); - - assert b; - - state0.finished(true); - } - else { - state0 = new State<>(oldEntry, null, false); - - state.set(state0); - - Entry<E, K, V> newEntry = removeEvent(oldEntry, key, val, null, evt); - - // Save new entry to state. - state0.newEntry(newEntry); - - // Replace former entry with the new one. - Entry<E, K, V> rmv = key2Entry.put(key, newEntry); - - assert rmv != null; - - state0.finished(true); - } - } - - /** {@inheritDoc} */ - @Override protected void endUpdate0(StreamerIndexUpdateSync sync, E evt, K key, boolean rollback) { - State<E, K, V> state0 = state.get(); - - if (state0 == null) - return; - - state.remove(); - - if (rollback && state0.finished()) { - Entry<E, K, V> oldEntry = state0.oldEntry(); - Entry<E, K, V> newEntry = state0.newEntry(); - - // Rollback after index was updated. - if (oldEntry != null && newEntry != null) { - boolean b = key2Entry.replace(key, newEntry, oldEntry); - - assert b; - } - else if (newEntry == null) { - // Old was removed. Need to put it back. - Entry<E, K, V> old = key2Entry.put(key, oldEntry); - - assert old == null; - } - else { - assert oldEntry == null; - - // New entry was added. Remove it. - boolean b = key2Entry.remove(key, newEntry); - - assert b; - } - } - } - - /** {@inheritDoc} */ - @Override public boolean sorted() { - return false; - } - - /** - * - */ - private class Index<I extends IndexKey<V>> implements StreamerIndex<E, K, V> { - /** */ - private final TrieMap<K, Entry<E, K, V>> key2Entry0 = key2Entry.readOnlySnapshot(); - - /** */ - private final int evtsCnt = eventsCount(); - - /** {@inheritDoc} */ - @Nullable @Override public String name() { - return getName(); - } - - /** {@inheritDoc} */ - @Override public boolean unique() { - return isUnique(); - } - - /** {@inheritDoc} */ - @Override public boolean sorted() { - return false; - } - - /** {@inheritDoc} */ - @Override public StreamerIndexPolicy policy() { - return getPolicy(); - } - - /** {@inheritDoc} */ - @Override public int size() { - return key2Entry0.size(); - } - - /** {@inheritDoc} */ - @Nullable @Override public StreamerIndexEntry<E, K, V> entry(K key) { - A.notNull(key, "key"); - - return trieGet(key, key2Entry0); - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerIndexEntry<E, K, V>> entries(int cnt) { - A.ensure(cnt >= 0, "cnt >= 0"); - - Collection vals = Collections.unmodifiableCollection(key2Entry0.values()); - - return (Collection<StreamerIndexEntry<E, K, V>>)(cnt == 0 ? vals : F.limit(vals, cnt)); - } - - /** {@inheritDoc} */ - @Override public Set<K> keySet(int cnt) { - A.ensure(cnt >= 0, "cnt >= 0"); - - return cnt == 0 ? Collections.unmodifiableSet(key2Entry0.keySet()) : - F.limit(key2Entry0.keySet(), cnt); - } - - /** {@inheritDoc} */ - @Override public Collection<V> values(int cnt) { - Collection<StreamerIndexEntry<E, K, V>> col = entries(cnt); - - return F.viewReadOnly(col, entryToVal); - } - - /** {@inheritDoc} */ - @Override public Collection<E> events(int cnt) { - A.ensure(cnt >= 0, "cnt >= 0"); - - if (getPolicy() == EVENT_TRACKING_OFF) - throw new IllegalStateException("Event tracking is off: " + this); - - Collection<E> evts = new AbstractCollection<E>() { - @NotNull @Override public Iterator<E> iterator() { - return new Iterator<E>() { - private final Iterator<Entry<E, K, V>> entryIter = key2Entry0.values().iterator(); - - private Iterator<E> evtIter; - - private boolean moved = true; - - private boolean more; - - @Override public boolean hasNext() { - if (!moved) - return more; - - moved = false; - - if (evtIter != null && evtIter.hasNext()) - return more = true; - - while (entryIter.hasNext()) { - evtIter = eventsIterator(entryIter.next()); - - if (evtIter.hasNext()) - return more = true; - } - - return more = false; - } - - @Override public E next() { - if (hasNext()) { - moved = true; - - return evtIter.next(); - } - - throw new NoSuchElementException(); - } - - @Override public void remove() { - assert false; - } - }; - } - - @Override public int size() { - return evtsCnt; - } - - /** - * @param entry Entry. - * @return Events iterator. - */ - @SuppressWarnings("fallthrough") - Iterator<E> eventsIterator(StreamerIndexEntry<E,K,V> entry) { - switch (getPolicy()) { - case EVENT_TRACKING_ON: - case EVENT_TRACKING_ON_DEDUP: - Collection<E> evts = entry.events(); - - assert evts != null; - - return evts.iterator(); - - default: - assert false; - - throw new IllegalStateException("Event tracking is off: " + Index.this); - } - } - }; - - - return cnt == 0 ? evts : F.limit(evts, cnt); - } - - /** {@inheritDoc} */ - @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(V val) { - return entrySet(true, val, true, val, true); - } - - /** {@inheritDoc} */ - @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(final boolean asc, @Nullable final V fromVal, - final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Set<K> keySet(V val) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Set<K> keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl, - @Nullable final V toVal, final boolean toIncl) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, - boolean toIncl) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Collection<E> events(V val) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Collection<E> events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl, - @Nullable final V toVal, final boolean toIncl) { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Nullable @Override public StreamerIndexEntry<E, K, V> firstEntry() { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Nullable @Override public StreamerIndexEntry<E, K, V> lastEntry() { - throw new UnsupportedOperationException("Operation is not supported on hash index."); - } - - /** {@inheritDoc} */ - @Override public Iterator<StreamerIndexEntry<E, K, V>> iterator() { - return entries(0).iterator(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Index.class, this, "provider", StreamerHashIndexProvider.this); - } - } - - /** - * - */ - private static class State<E, K, V> { - /** */ - private Entry<E, K, V> oldEntry; - - /** */ - private Entry<E, K, V> newEntry; - - /** */ - private boolean finished; - - /** - * @param oldEntry Old. - * @param newEntry New. - * @param finished Finished. - */ - private State(@Nullable Entry<E, K, V> oldEntry, @Nullable Entry<E, K, V> newEntry, boolean finished) { - this.oldEntry = oldEntry; - this.newEntry = newEntry; - this.finished = finished; - } - - /** - * @return Old. - */ - Entry<E, K, V> oldEntry() { - return oldEntry; - } - - /** - * @param oldEntry Old. - */ - void oldEntry(Entry<E, K, V> oldEntry) { - this.oldEntry = oldEntry; - } - - /** - * @return New. - */ - Entry<E, K, V> newEntry() { - return newEntry; - } - - /** - * @param newEntry New. - */ - void newEntry(Entry<E, K, V> newEntry) { - this.newEntry = newEntry; - } - - /** - * @return Finished. - */ - boolean finished() { - return finished; - } - - /** - * @param finished Finished. - */ - void finished(boolean finished) { - this.finished = finished; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(State.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/package-info.java deleted file mode 100644 index cdb726b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains hash-based streamer index implementation. - */ -package org.apache.ignite.streamer.index.hash; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/package-info.java deleted file mode 100644 index b20b74b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/index/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains APIs for indexing of streamer windows. - */ -package org.apache.ignite.streamer.index; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/StreamerTreeIndexProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/StreamerTreeIndexProvider.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/StreamerTreeIndexProvider.java deleted file mode 100644 index 9c17791..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/StreamerTreeIndexProvider.java +++ /dev/null @@ -1,953 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.index.tree; - -import com.romix.scala.collection.concurrent.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.util.snaptree.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.index.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.streamer.index.StreamerIndexPolicy.*; - -/** - * Tree index implementation of a {@link org.apache.ignite.streamer.index.StreamerIndexProvider}. - * <p> - * The advantage of a tree index is that it maintains entries in a - * sorted order, which is invaluable for many kinds of tasks, where - * event ordering makes sense (like {@code GridStreamingPopularNumbersExample}). - * The drawback is that the index entry values should be comparable to each other, - * and you'll are likely to need to implement a custom comparator for values in - * place of a default one. - * <p> - * If ordering is not required, consider using {@link org.apache.ignite.streamer.index.hash.StreamerHashIndexProvider} - * instead, which is more efficient (O(1) vs. O(log(n))) and does not require - * comparability. - * - * @see org.apache.ignite.streamer.index.hash.StreamerHashIndexProvider - * - */ -public class StreamerTreeIndexProvider<E, K, V> extends StreamerIndexProviderAdapter<E, K, V> { - /** */ - private SnapTreeMap<IndexKey<V>, Entry<E, K, V>> idx; - - /** */ - private TrieMap<K, Entry<E, K, V>> key2Entry; - - /** */ - private final AtomicLong idxGen = new AtomicLong(); - - /** */ - private Comparator<V> cmp; - - /** */ - private final ThreadLocal<State<E, K, V>> state = new ThreadLocal<>(); - - /** - * Sets comparator. - * - * @param cmp Comparator. - */ - public void setComparator(Comparator<V> cmp) { - this.cmp = cmp; - } - - /** {@inheritDoc} */ - @Override protected StreamerIndex<E, K, V> index0() { - return new Index<>(); - } - - /** {@inheritDoc} */ - @Override public void initialize() { - idx = cmp == null ? new SnapTreeMap<IndexKey<V>, Entry<E, K, V>>() : - new SnapTreeMap<IndexKey<V>, Entry<E, K, V>>(new Comparator<IndexKey<V>>() { - @Override public int compare(IndexKey<V> o1, IndexKey<V> o2) { - int res = cmp.compare(o1.value(), o2.value()); - - return res != 0 || isUnique() ? res : - ((Key<V>)o1).seed > ((Key<V>)o2).seed ? 1 : - ((Key<V>)o1).seed == ((Key<V>)o2).seed ? 0 : -1; - } - }); - - key2Entry = new TrieMap<>(); - } - - /** {@inheritDoc} */ - @Override public void reset0() { - // This will recreate maps. - initialize(); - } - - /** {@inheritDoc} */ - @Override protected void add(E evt, K key, StreamerIndexUpdateSync sync) { - State<E, K, V> state0 = state.get(); - - if (state0 != null) - throw new IllegalStateException("Previous operation has not been finished: " + state0); - - Entry<E, K, V> oldEntry = trieGet(key, key2Entry); - - StreamerIndexUpdater<E, K, V> updater = getUpdater(); - - if (oldEntry == null) { - V val = updater.initialValue(evt, key); - - if (val == null) - return; // Ignore event. - - IndexKey<V> idxKey = nextKey(val); - - state0 = new State<>(null, null, idxKey, null, false, false); - - if (isUnique()) - // Lock new key. - lockIndexKey(idxKey, sync); - - state.set(state0); - - Entry<E, K, V> newEntry = newEntry(key, val, idxKey, evt); - - // Save new entry to state. - state0.newEntry(newEntry); - - // Put new value to index. - Entry<E, K, V> old = idx.putIfAbsent(idxKey, newEntry); - - if (isUnique()) { - if (old != null) - throw new IgniteException("Index unique key violation [evt=" + evt + ", key=" + key + - ", idxKey=" + idxKey + ']'); - } - else - assert old == null; - - // Put new entry. - Entry<E, K, V> rmv = key2Entry.put(key, newEntry); - - assert rmv == null; - - // Update passed. - state0.finished(true); - } - else { - V val = updater.onAdded(oldEntry, evt); - - if (val == null) { - remove(evt, key, sync); - - return; - } - - IndexKey<V> newIdxKey = nextKey(val); - - IndexKey<V> oldIdxKey = oldEntry.keyIndex(); - - assert oldIdxKey != null; // Shouldn't be null for tree index. - - int order = compareKeys(oldIdxKey, newIdxKey); - - state0 = new State<>(oldIdxKey, oldEntry, newIdxKey, null, false, order == 0); - - if (isUnique()) { - if (order == 0) - // Keys are equal. - lockIndexKey(newIdxKey, sync); - else - lockKeys(oldIdxKey, newIdxKey, order, sync); - } - - state.set(state0); - - Entry<E, K, V> newEntry = addEvent(oldEntry, key, val, newIdxKey, evt); - - // Save new entry to state. - state0.newEntry(newEntry); - - if (state0.keysEqual()) { - assert isUnique(); - - boolean b = idx.replace(newIdxKey, oldEntry, newEntry); - - assert b; - } - else { - // Put new value to index with new key. - Entry<E, K, V> old = idx.putIfAbsent(newIdxKey, newEntry); - - if (isUnique()) { - if (old != null) - throw new IgniteException("Index unique key violation [evt=" + evt + ", key=" + key + - ", idxKey=" + newIdxKey + ']'); - } - else - assert old == null; - - boolean rmv0 = idx.remove(oldIdxKey, oldEntry); - - assert rmv0; - } - - // Replace former entry with the new one. - boolean b = key2Entry.replace(key, oldEntry, newEntry); - - assert b; - - // Update passed. - state0.finished(true); - } - } - - /** {@inheritDoc} */ - @Override protected void remove(E evt, K key, StreamerIndexUpdateSync sync) { - State<E, K, V> state0 = state.get(); - - if (state0 != null) - throw new IllegalStateException("Previous operation has not been finished: " + state0); - - Entry<E, K, V> oldEntry = trieGet(key, key2Entry); - - if (oldEntry == null) - return; - - StreamerIndexUpdater<E, K, V> updater = getUpdater(); - - V val = updater.onRemoved(oldEntry, evt); - - IndexKey<V> oldIdxKey = oldEntry.keyIndex(); - - assert oldIdxKey != null; // Shouldn't be null for tree index. - - if (val == null) { - state0 = new State<>(oldIdxKey, oldEntry, null, null, false, false); - - if (isUnique()) - // Lock old key. - lockIndexKey(oldIdxKey, sync); - - state.set(state0); - - boolean b = idx.remove(oldIdxKey, oldEntry); - - assert b; - - b = key2Entry.remove(key, oldEntry); - - assert b; - - state0.finished(true); - } - else { - IndexKey<V> newIdxKey = nextKey(val); - - int order = compareKeys(oldIdxKey, newIdxKey); - - state0 = new State<>(oldIdxKey, oldEntry, newIdxKey, null, false, order == 0); - - if (isUnique()) { - if (order == 0) - // Keys are equal. - lockIndexKey(newIdxKey, sync); - else - lockKeys(oldIdxKey, newIdxKey, order, sync); - } - - state.set(state0); - - Entry<E, K, V> newEntry = removeEvent(oldEntry, key, val, newIdxKey, evt); - - // Save new entry to state. - state0.newEntry(newEntry); - - if (state0.keysEqual()) { - assert isUnique(); - - boolean b = idx.replace(newIdxKey, oldEntry, newEntry); - - assert b; - } - else { - // Put new value to index with new key. - Entry<E, K, V> old = idx.putIfAbsent(newIdxKey, newEntry); - - if (isUnique()) { - if (old != null) - throw new IgniteException("Index unique key violation [evt=" + evt + ", key=" + key + - ", idxKey=" + newIdxKey + ']'); - } - else - assert old == null; - - boolean rmv0 = idx.remove(oldIdxKey, oldEntry); - - assert rmv0; - } - - // Replace former entry with the new one. - boolean b = key2Entry.replace(key, oldEntry, newEntry); - - assert b; - - state0.finished(true); - } - } - - /** - * @param key1 Key. - * @param key2 Key. - * @param order Keys comparison result. - * @param sync Sync. - * @throws IgniteException If interrupted. - */ - private void lockKeys(IndexKey<V> key1, IndexKey<V> key2, int order, StreamerIndexUpdateSync sync) - throws IgniteException { - assert isUnique(); - assert key1 != null; - assert key2 != null; - assert order != 0; - - boolean success = false; - - try { - if (order > 0) { - lockIndexKey(key1, sync); - lockIndexKey(key2, sync); - } - else { - // Reverse order. - lockIndexKey(key2, sync); - lockIndexKey(key1, sync); - } - - success = true; - } - finally { - if (!success) { - unlockIndexKey(key1, sync); - unlockIndexKey(key2, sync); - } - } - } - - /** - * @param key1 Key. - * @param key2 Key. - * @return Comparison result. - */ - private int compareKeys(IndexKey<V> key1, IndexKey<V> key2) { - assert key1 != null; - assert key2 != null; - - return cmp != null ? cmp.compare(key1.value(), key2.value()) : - ((Comparable<V>)key1.value()).compareTo(key2.value()); - } - - /** {@inheritDoc} */ - @Override protected void endUpdate0(StreamerIndexUpdateSync sync, E evt, K key, boolean rollback) { - State<E, K, V> state0 = state.get(); - - if (state0 == null) - return; - - state.remove(); - - IndexKey<V> oldIdxKey = state0.oldIndexKey(); - Entry<E, K, V> oldEntry = state0.oldEntry(); - IndexKey<V> newIdxKey = state0.newIndexKey(); - Entry<E, K, V> newEntry = state0.newEntry(); - - if (rollback && state0.finished()) { - // Rollback after index was updated. - if (oldEntry != null && newEntry != null) { - if (state0.keysEqual()) { - assert isUnique(); - - boolean b = idx.replace(oldIdxKey, newEntry, oldEntry); - - assert b; - } - else { - boolean b = idx.remove(newIdxKey, newEntry); - - assert b; - - Entry<E, K, V> old = idx.put(oldIdxKey, oldEntry); - - assert old == null; - - b = key2Entry.replace(key, newEntry, oldEntry); - - assert b; - } - } - else if (newEntry == null) { - // Old was removed. Need to put it back. - Entry<E, K, V> old = key2Entry.put(key, oldEntry); - - assert old == null; - - old = idx.put(oldIdxKey, oldEntry); - - assert old == null; - } - else { - assert oldEntry == null; - - // New entry was added. Remove it. - boolean b = idx.remove(newIdxKey, newEntry); - - assert b; - - b = key2Entry.remove(key, newEntry); - - assert b; - } - } - - // Unlock only if unique. - if (isUnique()) { - if (oldIdxKey != null) - unlockIndexKey(oldIdxKey, sync); - - if (state0.keysEqual()) - // No need to unlock second key. - return; - - if (newIdxKey != null) - unlockIndexKey(newIdxKey, sync); - } - } - - /** - * @param val Value. - * @return Index key. - */ - protected IndexKey<V> nextKey(V val) { - return new Key<>(val, isUnique() ? 0 : idxGen.incrementAndGet(), cmp); - } - - /** - * @param val Value. - * @param asc {@code True} if ascending. - * @param incl {@code True} if inclusive. - * @return Key for search. - */ - private IndexKey<V> searchKeyFrom(V val, boolean asc, boolean incl) { - // (asc && incl) || (!asc && !incl) -> asc == incl - return new Key<>(val, asc == incl ? Long.MIN_VALUE : Long.MAX_VALUE, cmp); - } - - /** - * @param val Value. - * @param asc {@code True} if ascending. - * @param incl {@code True} if inclusive. - * @return Key for search. - */ - private IndexKey<V> searchKeyTo(V val, boolean asc, boolean incl) { - // (asc && incl) || (!asc && !incl) -> asc == incl - return new Key<>(val, asc == incl ? Long.MAX_VALUE : Long.MIN_VALUE, cmp); - } - - /** {@inheritDoc} */ - @Override public boolean sorted() { - return true; - } - - /** - * - */ - private static class Key<V> implements Comparable<Key<V>>, IndexKey<V> { - /** */ - private final V val; - - /** */ - private final long seed; - - /** */ - private final Comparator<V> cmp; - - /** - * @param val Value. - * @param seed Seed. - * @param cmp Comparator. - */ - private Key(V val, long seed, @Nullable Comparator<V> cmp) { - assert val != null; - - this.val = val; - this.seed = seed; - this.cmp = cmp; - } - - /** {@inheritDoc} */ - @Override public V value() { - return val; - } - - /** {@inheritDoc} */ - @Override public int compareTo(Key<V> o) { - int res = cmp != null ? cmp.compare(val, o.val) : ((Comparable<V>)val).compareTo(o.val); - - return res == 0 ? (seed < o.seed ? -1 : seed > o.seed ? 1 : 0) : res; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return 31 * val.hashCode() + (int)(seed ^ (seed >>> 32)); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (obj == null || obj.getClass() != Key.class) - return false; - - Key key = (Key)obj; - - return seed == key.seed && val.equals(key.val); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Key.class, this); - } - } - - /** - * - */ - private static class State<E, K, V> { - /** */ - private IndexKey<V> oldIdxKey; - - /** */ - private Entry<E, K, V> oldEntry; - - /** */ - private IndexKey<V> newIdxKey; - - /** */ - private Entry<E, K, V> newEntry; - - /** */ - private boolean finished; - - /** */ - private final boolean keysEqual; - - /** - * @param oldIdxKey Old index key. - * @param oldEntry Old entry. - * @param newIdxKey New Index key. - * @param newEntry New entry. - * @param finished Finished. - * @param keysEqual {@code True} if keys are equal. - */ - private State(@Nullable IndexKey<V> oldIdxKey, @Nullable Entry<E, K, V> oldEntry, @Nullable IndexKey<V> newIdxKey, - @Nullable Entry<E, K, V> newEntry, boolean finished, boolean keysEqual) { - this.oldIdxKey = oldIdxKey; - this.oldEntry = oldEntry; - this.newIdxKey = newIdxKey; - this.newEntry = newEntry; - this.finished = finished; - this.keysEqual = keysEqual; - } - - /** - * @return Old index entry. - */ - IndexKey<V> oldIndexKey() { - return oldIdxKey; - } - - /** - * @param oldIdxKey Old index key. - */ - void oldIndexKey(IndexKey<V> oldIdxKey) { - this.oldIdxKey = oldIdxKey; - } - - /** - * @return Old. - */ - Entry<E, K, V> oldEntry() { - return oldEntry; - } - - /** - * @param oldEntry Old. - */ - void oldEntry(Entry<E, K, V> oldEntry) { - this.oldEntry = oldEntry; - } - - /** - * @return New index key. - */ - IndexKey<V> newIndexKey() { - return newIdxKey; - } - - /** - * @param newIdxKey New index key. - */ - void newIndexKey(IndexKey<V> newIdxKey) { - this.newIdxKey = newIdxKey; - } - - /** - * @return New. - */ - Entry<E, K, V> newEntry() { - return newEntry; - } - - /** - * @param newEntry New. - */ - void newEntry(Entry<E, K, V> newEntry) { - this.newEntry = newEntry; - } - - /** - * @return Finished. - */ - boolean finished() { - return finished; - } - - /** - * @param finished Finished. - */ - void finished(boolean finished) { - this.finished = finished; - } - - /** - * @return {@code True} if both keys are not null and are equal (as comparables). - */ - boolean keysEqual() { - return keysEqual; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(State.class, this); - } - } - - /** - * - */ - private class Index<I extends IndexKey<V>> implements StreamerIndex<E, K, V> { - /** */ - private final TrieMap<K, Entry<E, K, V>> key2Entry0 = key2Entry.readOnlySnapshot(); - - /** */ - private final SnapTreeMap<IndexKey<V>, Entry<E, K, V>> idx0 = idx.clone(); - - /** */ - private final int evtsCnt = eventsCount(); - - /** {@inheritDoc} */ - @Nullable @Override public String name() { - return getName(); - } - - /** {@inheritDoc} */ - @Override public boolean unique() { - return isUnique(); - } - - /** {@inheritDoc} */ - @Override public boolean sorted() { - return true; - } - - /** {@inheritDoc} */ - @Override public StreamerIndexPolicy policy() { - return getPolicy(); - } - - /** {@inheritDoc} */ - @Override public int size() { - return key2Entry0.size(); - } - - /** {@inheritDoc} */ - @Nullable @Override public StreamerIndexEntry<E, K, V> entry(K key) { - A.notNull(key, "key"); - - return trieGet(key, key2Entry0); - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerIndexEntry<E, K, V>> entries(int cnt) { - Collection col = cnt >= 0 ? idx0.values() : idx0.descendingMap().values(); - - return (Collection<StreamerIndexEntry<E, K, V>>)(cnt == 0 ? Collections.unmodifiableCollection(col) : - F.limit(col, U.safeAbs(cnt))); - } - - /** {@inheritDoc} */ - @Override public Set<K> keySet(final int cnt) { - Set<K> col = new AbstractSet<K>() { - private Collection<K> entries = F.viewReadOnly( - cnt >= 0 ? idx0.values() : idx0.descendingMap().values(), - entryToKey); - - @NotNull @Override public Iterator<K> iterator() { - return entries.iterator(); - } - - @Override public int size() { - return entries.size(); - } - }; - - return cnt == 0 ? col : F.limit(col, U.safeAbs(cnt)); - } - - /** {@inheritDoc} */ - @Override public Collection<V> values(int cnt) { - Collection<StreamerIndexEntry<E, K, V>> col = entries(cnt); - - return F.viewReadOnly(col, entryToVal); - } - - /** {@inheritDoc} */ - @Override public Collection<E> events(int cnt) { - Collection<E> evts = events(cnt >= 0, null, false, null, false); - - return cnt == 0 ? evts : F.limit(evts, U.safeAbs(cnt)); - } - - /** {@inheritDoc} */ - @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(V val) { - return entrySet(true, val, true, val, true); - } - - /** {@inheritDoc} */ - @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(final boolean asc, @Nullable final V fromVal, - final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) { - Set<StreamerIndexEntry<E, K, V>> set = new AbstractSet<StreamerIndexEntry<E, K, V>>() { - private Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl); - - @NotNull @Override public Iterator<StreamerIndexEntry<E, K, V>> iterator() { - Collection vals = map.values(); - - return (Iterator<StreamerIndexEntry<E, K, V>>)vals.iterator(); - } - - @Override public int size() { - return map.size(); - } - }; - - return Collections.unmodifiableSet(set); - } - - /** {@inheritDoc} */ - @Override public Set<K> keySet(V val) { - return keySet(true, val, true, val, true); - } - - /** {@inheritDoc} */ - @Override public Set<K> keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl, - @Nullable final V toVal, final boolean toIncl) { - Set<K> set = new AbstractSet<K>() { - private Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl); - - @NotNull @Override public Iterator<K> iterator() { - return F.iterator(map.values(), entryToKey, true); - } - - @Override public int size() { - return map.size(); - } - }; - - return Collections.unmodifiableSet(set); - } - - /** {@inheritDoc} */ - @Override public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, - boolean toIncl) { - Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl); - - return F.viewReadOnly(map.values(), entryToVal); - } - - /** {@inheritDoc} */ - @Override public Collection<E> events(V val) { - A.notNull(val, "val"); - - return events(true, val, true, val, true); - } - - /** {@inheritDoc} */ - @Override public Collection<E> events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl, - @Nullable final V toVal, final boolean toIncl) { - if (getPolicy() == EVENT_TRACKING_OFF) - throw new IllegalStateException("Event tracking is off: " + this); - - Collection<E> evts = new AbstractCollection<E>() { - private final Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl); - - private int size = -1; - - @NotNull @Override public Iterator<E> iterator() { - return new Iterator<E>() { - private final Iterator<Entry<E, K, V>> entryIter = map.values().iterator(); - - private Iterator<E> evtIter; - - private boolean moved = true; - - private boolean more; - - @Override public boolean hasNext() { - if (!moved) - return more; - - moved = false; - - if (evtIter != null && evtIter.hasNext()) - return more = true; - - while (entryIter.hasNext()) { - evtIter = eventsIterator(entryIter.next()); - - if (evtIter.hasNext()) - return more = true; - } - - return more = false; - } - - @Override public E next() { - if (hasNext()) { - moved = true; - - return evtIter.next(); - } - - throw new NoSuchElementException(); - } - - @Override public void remove() { - assert false; - } - }; - } - - @Override public int size() { - return size != -1 ? size : - fromVal == null && toVal == null ? (size = evtsCnt) : (size = F.size(iterator())); - } - - /** - * @param entry Entry. - * @return Events iterator. - */ - @SuppressWarnings("fallthrough") - Iterator<E> eventsIterator(StreamerIndexEntry<E,K,V> entry) { - switch (getPolicy()) { - case EVENT_TRACKING_ON: - case EVENT_TRACKING_ON_DEDUP: - Collection<E> evts = entry.events(); - - assert evts != null; - - return evts.iterator(); - - default: - assert false; - - throw new IllegalStateException("Event tracking is off: " + Index.this); - } - } - }; - - return Collections.unmodifiableCollection(evts); - } - - /** {@inheritDoc} */ - @Nullable @Override public StreamerIndexEntry<E, K, V> firstEntry() { - return idx0.firstEntry().getValue(); - } - - /** {@inheritDoc} */ - @Nullable @Override public StreamerIndexEntry<E, K, V> lastEntry() { - return idx0.lastEntry().getValue(); - } - - /** {@inheritDoc} */ - @Override public Iterator<StreamerIndexEntry<E, K, V>> iterator() { - return entries(0).iterator(); - } - - /** - * @param asc Ascending. - * @param fromVal From. - * @param fromIncl Include from. - * @param toVal To. - * @param toIncl Include to. - * @return Map. - */ - private Map<IndexKey<V>, Entry<E, K, V>> subMap(boolean asc, @Nullable V fromVal, boolean fromIncl, - @Nullable V toVal, boolean toIncl) { - if (fromVal != null && toVal != null) { - int cmpRes = cmp != null ? cmp.compare(toVal, fromVal) : ((Comparable<V>)toVal).compareTo(fromVal); - - if ((asc && cmpRes < 0) || (!asc && cmpRes > 0)) - throw new IllegalArgumentException("Boundaries are invalid [asc=" + asc + ", fromVal=" + fromVal + - ", toVal=" + toVal + ']'); - } - - if (idx0.isEmpty()) - return Collections.emptyMap(); - - ConcurrentNavigableMap<IndexKey<V>,Entry<E,K,V>> map = asc ? idx0 : idx0.descendingMap(); - - if (fromVal == null) { - fromVal = map.firstKey().value(); - - fromIncl = true; - } - - if (toVal == null) { - toVal = map.lastKey().value(); - - toIncl = true; - } - - return map.subMap(searchKeyFrom(fromVal, asc, fromIncl), fromIncl, searchKeyTo(toVal, asc, toIncl), toIncl); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Index.class, this, "provider", StreamerTreeIndexProvider.this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/package-info.java deleted file mode 100644 index 65f533c..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains tree-based streamer index implementation. - */ -package org.apache.ignite.streamer.index.tree; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/package-info.java deleted file mode 100644 index b4aeb02..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains main <b>Streaming APIs.</b> - */ -package org.apache.ignite.streamer; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java deleted file mode 100644 index dee5a19..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.router; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.streamer.*; - -import java.util.*; - -/** - * Router used to colocate identical streamer events or events with identical affinity - * key on the same node. Such collocation is often required to perform computations on - * multiple events together, for example, find number of occurrences of a word in some - * text. In this case you would collocate identical words together to make sure that - * you can update their counts. - * <h1 class="header">Affinity Key</h1> - * Affinity key for collocation of event together on the same node is specified - * via {@link AffinityEvent#affinityKey()} method. If event does not implement - * {@link AffinityEvent} interface, then event itself will be used to determine affinity. - */ -public class StreamerAffinityEventRouter extends StreamerEventRouterAdapter { - /** */ - public static final int REPLICA_CNT = 128; - - /** - * All events that implement this interface will be routed based on key affinity. - */ - @SuppressWarnings("PublicInnerClass") - public interface AffinityEvent { - /** - * @return Affinity route key for the event. - */ - public Object affinityKey(); - } - - /** Grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - private final GridConsistentHash<UUID> nodeHash = new GridConsistentHash<>(); - - /** */ - private Collection<UUID> addedNodes = new GridConcurrentHashSet<>(); - - /** {@inheritDoc} */ - @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) { - return node(evt instanceof AffinityEvent ? ((AffinityEvent) evt).affinityKey() : - evt, ctx); - } - - /** - * @param obj Object. - * @param ctx Context. - * @return Rich node. - */ - private ClusterNode node(Object obj, StreamerContext ctx) { - while (true) { - Collection<ClusterNode> nodes = ctx.projection().nodes(); - - assert nodes != null; - assert !nodes.isEmpty(); - - int nodesSize = nodes.size(); - - if (nodesSize == 1) { // Minor optimization. - ClusterNode ret = F.first(nodes); - - assert ret != null; - - return ret; - } - - final Collection<UUID> lookup = U.newHashSet(nodesSize); - - // Store nodes in map for fast lookup. - for (ClusterNode n : nodes) - // Add nodes into hash circle, if absent. - lookup.add(resolveNode(n)); - - // Cleanup circle. - if (lookup.size() != addedNodes.size()) { - Collection<UUID> rmv = null; - - for (Iterator<UUID> iter = addedNodes.iterator(); iter.hasNext(); ) { - UUID id = iter.next(); - - if (!lookup.contains(id)) { - iter.remove(); - - if (rmv == null) - rmv = new ArrayList<>(); - - rmv.add(id); - } - } - - if (!F.isEmpty(rmv)) - nodeHash.removeNodes(rmv); - } - - UUID nodeId = nodeHash.node(obj, lookup); - - assert nodeId != null; - - ClusterNode node = ctx.projection().node(nodeId); - - if (node != null) - return node; - } - } - - /** - * Add node to hash circle if this is the first node invocation. - * - * @param n Node to get info for. - * @return Node ID. - */ - private UUID resolveNode(ClusterNode n) { - UUID nodeId = n.id(); - - if (!addedNodes.contains(nodeId)) { - addedNodes.add(nodeId); - - nodeHash.addNode(nodeId, REPLICA_CNT); - } - - return nodeId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java deleted file mode 100644 index ddc8059..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.router; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -/** - * Router used to colocate streamer events with data stored in a partitioned cache. - * <h1 class="header">Affinity Key</h1> - * Affinity key for collocation of event together on the same node is specified - * via {@link CacheAffinityEvent#affinityKey()} method. If event does not implement - * {@link CacheAffinityEvent} interface, then event will be routed always to local node. - */ -public class StreamerCacheAffinityEventRouter extends StreamerEventRouterAdapter { - /** - * All events that implement this interface will be routed based on key affinity. - */ - @SuppressWarnings("PublicInnerClass") - public interface CacheAffinityEvent { - /** - * @return Affinity route key for the event. - */ - public Object affinityKey(); - - /** - * @return Cache name, if {@code null}, the default cache is used. - */ - @Nullable public String cacheName(); - } - - /** Grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** {@inheritDoc} */ - @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) { - if (evt instanceof CacheAffinityEvent) { - CacheAffinityEvent e = (CacheAffinityEvent)evt; - - GridCache<Object, Object> c = ((IgniteEx) ignite).cachex(e.cacheName()); - - assert c != null; - - return c.affinity().mapKeyToNode(e.affinityKey()); - } - - return ignite.cluster().localNode(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java deleted file mode 100644 index e9a2f8d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.router; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.streamer.*; - -import java.util.*; - -/** - * Local router. Always routes event to local node. - */ -public class StreamerLocalEventRouter implements StreamerEventRouter { - /** Grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** {@inheritDoc} */ - @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) { - return ignite.cluster().localNode(); - } - - /** {@inheritDoc} */ - @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName, - Collection<T> evts) { - return F.asMap(ignite.cluster().localNode(), evts); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java deleted file mode 100644 index ce41944..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.router; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Random router. Routes event to random node. - */ -public class StreamerRandomEventRouter extends StreamerEventRouterAdapter { - /** Optional predicates to exclude nodes from routing. */ - private IgnitePredicate<ClusterNode>[] predicates; - - /** - * Empty constructor for spring. - */ - public StreamerRandomEventRouter() { - this((IgnitePredicate<ClusterNode>[])null); - } - - /** - * Constructs random event router with optional set of filters to apply to streamer projection. - * - * @param predicates Node predicates. - */ - public StreamerRandomEventRouter(@Nullable IgnitePredicate<ClusterNode>... predicates) { - this.predicates = predicates; - } - - /** - * Constructs random event router with optional set of filters to apply to streamer projection. - * - * @param predicates Node predicates. - */ - @SuppressWarnings("unchecked") - public StreamerRandomEventRouter(Collection<IgnitePredicate<ClusterNode>> predicates) { - if (!F.isEmpty(predicates)) { - this.predicates = new IgnitePredicate[predicates.size()]; - - predicates.toArray(this.predicates); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) { - Collection<ClusterNode> nodes = F.view(ctx.projection().nodes(), predicates); - - if (F.isEmpty(nodes)) - return null; - - int idx = ThreadLocalRandom8.current().nextInt(nodes.size()); - - int i = 0; - - Iterator<ClusterNode> iter = nodes.iterator(); - - while (true) { - if (!iter.hasNext()) - iter = nodes.iterator(); - - ClusterNode node = iter.next(); - - if (idx == i++) - return node; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java deleted file mode 100644 index 2471846..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.router; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.streamer.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Round robin router. - */ -public class StreamerRoundRobinEventRouter extends StreamerEventRouterAdapter { - /** */ - private final AtomicLong lastOrder = new AtomicLong(); - - /** {@inheritDoc} */ - @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) { - Collection<ClusterNode> nodes = ctx.projection().nodes(); - - int idx = (int)(lastOrder.getAndIncrement() % nodes.size()); - - int i = 0; - - Iterator<ClusterNode> iter = nodes.iterator(); - - while (true) { - if (!iter.hasNext()) - iter = nodes.iterator(); - - ClusterNode node = iter.next(); - - if (idx == i++) - return node; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/package-info.java deleted file mode 100644 index ad139e6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/router/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains streamer event router implementations. - */ -package org.apache.ignite.streamer.router; \ No newline at end of file