ignite-gg9499 - fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1226b240 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1226b240 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1226b240 Branch: refs/heads/ignite-gg9499 Commit: 1226b2401b58e0e75d80e01787c0cee39ce5aeee Parents: 29823e6 Author: S.Vladykin <svlady...@gridgain.com> Authored: Fri Dec 19 15:09:04 2014 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Fri Dec 19 15:09:04 2014 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 11 +- .../query/h2/twostep/GridMergeIndex.java | 116 ++++++++++++++----- .../h2/twostep/GridMergeIndexUnsorted.java | 66 ++++++----- .../query/h2/twostep/GridMergeTable.java | 4 +- .../h2/twostep/GridReduceQueryExecutor.java | 34 +++--- .../query/h2/twostep/GridResultPage.java | 53 +++------ .../twostep/messages/GridNextPageResponse.java | 29 ++++- .../cache/GridCacheCrossCacheQuerySelfTest.java | 34 +++++- 8 files changed, 240 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java index b86caf1..92d1875 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -211,19 +211,26 @@ public class GridMapQueryExecutor { assert res != null; + boolean last = false; + synchronized (res) { page = qr.pages[qry]++; for (int i = 0 ; i < pageSize; i++) { - if (!res.next()) + if (!res.next()) { + last = true; + break; + } rows.add(res.currentRow()); } } try { - ctx.io().sendUserMessage(F.asList(node), new GridNextPageResponse(qr.qryReqId, qry, page, allRows, rows)); + ctx.io().sendUserMessage(F.asList(node), + new GridNextPageResponse(qr.qryReqId, qry, page, allRows, last, rows), + GridTopic.TOPIC_QUERY, false, 0); } catch (IgniteCheckedException e) { log.error("Failed to send message.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java index 16ba15d..5d51b32 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java @@ -10,11 +10,14 @@ package org.gridgain.grid.kernal.processors.query.h2.twostep; import org.apache.ignite.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.typedef.*; import org.h2.engine.*; import org.h2.index.*; import org.h2.message.*; import org.h2.result.*; import org.h2.table.*; +import org.jdk8.backport.*; import org.jetbrains.annotations.*; import java.util.*; @@ -25,16 +28,32 @@ import java.util.concurrent.atomic.*; */ public abstract class GridMergeIndex extends BaseIndex { /** */ + protected final GridResultPage<?> END = new GridResultPage<Object>(null, null); + + /** */ private static final int MAX_FETCH_SIZE = 100000; /** */ private final AtomicInteger cnt = new AtomicInteger(0); + /** Result sources. */ + private final AtomicInteger srcs = new AtomicInteger(0); + /** * Will be r/w from query execution thread only, does not need to be threadsafe. */ private ArrayList<Row> fetched = new ArrayList<>(); + /** + * @param tbl Table. + * @param name Index name. + * @param type Type. + * @param cols Columns. + */ + public GridMergeIndex(GridMergeTable tbl, String name, IndexType type, IndexColumn[] cols) { + initBaseIndex(tbl, 0, name, cols, type); + } + /** {@inheritDoc} */ @Override public long getRowCount(Session session) { return cnt.get(); @@ -46,6 +65,13 @@ public abstract class GridMergeIndex extends BaseIndex { } /** + * @param srcs Number of sources. + */ + public void setNumberOfSources(int srcs) { + this.srcs.set(srcs); + } + + /** * @param cnt Count. */ public void addCount(int cnt) { @@ -55,7 +81,26 @@ public abstract class GridMergeIndex extends BaseIndex { /** * @param page Page. */ - public abstract void addPage(GridResultPage<?> page); + public final void addPage(GridResultPage<?> page) { + if (!page.response().rows().isEmpty()) + addPage0(page); + else + assert page.response().isLast(); + + if (page.response().isLast()) { + int srcs0 = srcs.decrementAndGet(); + + assert srcs0 >= 0; + + if (srcs0 == 0) + addPage0(END); // We've fetched all. + } + } + + /** + * @param page Page. + */ + protected abstract void addPage0(GridResultPage<?> page); /** {@inheritDoc} */ @Override public Cursor find(Session session, SearchRow first, SearchRow last) { @@ -145,7 +190,7 @@ public abstract class GridMergeIndex extends BaseIndex { */ protected class IteratorCursor implements Cursor { /** */ - private Iterator<Row> iter; + protected Iterator<Row> iter; /** */ protected Row cur; @@ -185,47 +230,66 @@ public abstract class GridMergeIndex extends BaseIndex { /** * Fetching cursor. */ - protected abstract class FetchingCursor extends IteratorCursor { + protected class FetchingCursor extends IteratorCursor { /** */ - private boolean canFetch = true; + private Iterator<Row> stream; /** */ - public FetchingCursor() { - super(fetched == null ? Collections.<Row>emptyIterator() : fetched.iterator()); - } + public FetchingCursor(Iterator<Row> stream) { + super(new FetchedIterator()); - /** - * @return Next row or {@code null} if none available. - */ - @Nullable protected abstract Row fetchNext(); + assert stream != null; + + this.stream = stream; + } /** {@inheritDoc} */ @Override public boolean next() { - if (super.next()) + if (super.next()) { + assert cur != null; + + if (iter == stream && fetched != null) { // Cache fetched rows for reuse. + if (fetched.size() == MAX_FETCH_SIZE) + fetched = null; // Throw away fetched result if it is too large. + else + fetched.add(cur); + } + + X.println("__ row: " + Arrays.toString(cur.getValueList())); + return true; + } - if (!canFetch) + if (iter == stream) // We've fetched the stream. return false; - cur = fetchNext(); - - if (cur == null) { // No more results to fetch. - assert fetched == null || fetched.size() == cnt.get() : fetched.size() + " <> " + cnt.get(); + iter = stream; // Switch from cached to stream. - canFetch = false; + return next(); + } + } - return false; - } + /** + * List iterator without {@link ConcurrentModificationException}. + */ + private class FetchedIterator implements Iterator<Row> { + /** */ + private int idx; - if (fetched != null) { // Try to reuse fetched result. - fetched.add(cur); + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return fetched != null && idx < fetched.size(); + } - if (fetched.size() == MAX_FETCH_SIZE) - fetched = null; // Throw away fetched result if it is too large. - } + /** {@inheritDoc} */ + @Override public Row next() { + return fetched.get(idx++); + } - return true; + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index c6aaea9..b93c6ec 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -10,8 +10,10 @@ package org.gridgain.grid.kernal.processors.query.h2.twostep; import org.apache.ignite.*; +import org.gridgain.grid.util.typedef.*; import org.h2.index.*; import org.h2.result.*; +import org.h2.table.*; import org.h2.value.*; import org.jetbrains.annotations.*; @@ -25,50 +27,60 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { /** */ private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>(); + /** + * @param tbl Table. + * @param name Index name. + */ + public GridMergeIndexUnsorted(GridMergeTable tbl, String name) { + super(tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns())); + } + /** {@inheritDoc} */ - @Override public void addPage(GridResultPage<?> page) { + @Override public void addPage0(GridResultPage<?> page) { queue.add(page); } /** {@inheritDoc} */ @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) { - final GridResultPage<?> p = queue.poll(); - - assert p != null; // First page must be already fetched. + return new FetchingCursor(new Iterator<Row>() { + /** */ + Iterator<Value[]> iter = Collections.emptyIterator(); - if (p.isEmpty()) - return new IteratorCursor(Collections.<Row>emptyIterator()); + @Override public boolean hasNext() { + if (iter.hasNext()) + return true; - p.fetchNextPage(); // We always request next page before reading this one. + GridResultPage<?> page; - return new FetchingCursor() { - /** */ - Iterator<Value[]> iter = p.rows().iterator(); + try { + page = queue.take(); + } + catch (InterruptedException e) { + throw new IgniteException("Query execution was interrupted.", e); + } - @Nullable @Override protected Row fetchNext() { - if (!iter.hasNext()) { - GridResultPage<?> page; + if (page == END) { + assert queue.isEmpty() : "It must be the last page: " + queue; - try { - page = queue.take(); - } - catch (InterruptedException e) { - throw new IgniteException("Query execution was interrupted.", e); - } + return false; // We are done. + } - if (page.isEmpty()) { - assert queue.isEmpty() : "It must be the last page."; + page.fetchNextPage(); - return null; // Empty page - we are done. - } + iter = page.response().rows().iterator(); - page.fetchNextPage(); + assert iter.hasNext(); - iter = page.rows().iterator(); - } + return true; + } + @Override public Row next() { return new Row(iter.next(), 0); } - }; + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java index a1f2213..683bb54 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java @@ -27,7 +27,7 @@ public class GridMergeTable extends TableBase { private final ArrayList<Index> idxs = new ArrayList<>(1); /** */ - private final GridMergeIndex idx = new GridMergeIndexUnsorted(); + private final GridMergeIndex idx; /** * @param data Data. @@ -35,6 +35,8 @@ public class GridMergeTable extends TableBase { public GridMergeTable(CreateTableData data) { super(data); + idx = new GridMergeIndexUnsorted(this, "merge_scan"); + idxs.add(idx); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 3e7e12c..a12b4f9 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -59,8 +59,8 @@ public class GridReduceQueryExecutor { // TODO handle node failure. - ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { assert msg != null; ClusterNode node = ctx.discovery().node(nodeId); @@ -69,6 +69,8 @@ public class GridReduceQueryExecutor { onNextPage(node, (GridNextPageResponse)msg); else if (msg instanceof GridQueryFailResponse) onFail(node, (GridQueryFailResponse)msg); + + return true; } }); } @@ -86,7 +88,13 @@ public class GridReduceQueryExecutor { GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null); - idx.addPage(new GridResultPage<Object>(node.id(), msg.query(), msg.rows()) { + if (msg.allRows() != -1) { // Only the first page contains row count. + idx.addCount(msg.allRows()); + + r.latch.countDown(); + } + + idx.addPage(new GridResultPage<UUID>(node.id(), msg) { @Override public void fetchNextPage() { try { ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize)); @@ -96,12 +104,6 @@ public class GridReduceQueryExecutor { } } }); - - if (msg.allRows() != -1) { // Only the first page contains row count. - idx.addCount(msg.allRows()); - - r.latch.countDown(); - } } public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) { @@ -118,17 +120,23 @@ public class GridReduceQueryExecutor { throw new IgniteException(e); } - for (GridCacheSqlQuery mapQry : qry.mapQueries()) - r.tbls.add(createTable(r.conn, mapQry)); - Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow? + for (GridCacheSqlQuery mapQry : qry.mapQueries()) { + GridMergeTable tbl = createTable(r.conn, mapQry); + + tbl.getScanIndex(null).setNumberOfSources(nodes.size()); + + r.tbls.add(tbl); + } + r.latch = new CountDownLatch(r.tbls.size() * nodes.size()); this.runs.put(qryReqId, r); try { - ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries())); // TODO conf page size + ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries()), // TODO conf page size + GridTopic.TOPIC_QUERY, false, 0); r.latch.await(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java index 6c6ab6a..ef52805 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java @@ -9,68 +9,51 @@ package org.gridgain.grid.kernal.processors.query.h2.twostep; -import org.h2.result.*; -import org.h2.value.*; - -import java.util.*; +import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*; +import org.gridgain.grid.util.typedef.internal.*; /** * Page result. */ -public abstract class GridResultPage<S> { - /** */ - private final S src; - +public class GridResultPage<Z> { /** */ - private final Collection<Value[]> rows; + private final Z src; /** */ - private final int page; + private final GridNextPageResponse res; /** * @param src Source. - * @param page Page. - * @param rows Page rows. + * @param res Response. */ - protected GridResultPage(S src, int page, Collection<Value[]> rows) { - assert src != null; - assert rows != null; - + protected GridResultPage(Z src, GridNextPageResponse res) { this.src = src; - this.page = page; - this.rows = rows; + this.res = res; } /** * @return Result source. */ - public S source() { + public Z source() { return src; } /** - * @return Page. + * @return Response. */ - public int page() { - return page; + public GridNextPageResponse response() { + return res; } /** - * @return {@code true} If result is empty. + * Request next page. */ - public boolean isEmpty() { - return rows.isEmpty(); + public void fetchNextPage() { + throw new UnsupportedOperationException(); } - /** - * @return Page rows. - */ - public Collection<Value[]> rows() { - return rows; + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridResultPage.class, this); } - - /** - * Request next page. - */ - public abstract void fetchNextPage(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java index de38172..0834f4c 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.query.h2.twostep.messages; +import org.gridgain.grid.util.typedef.internal.*; import org.h2.store.*; import org.h2.value.*; @@ -34,20 +35,32 @@ public class GridNextPageResponse implements Externalizable { /** */ private Collection<Value[]> rows; + /** */ + private boolean last; + + /** + * For {@link Externalizable}. + */ + public GridNextPageResponse() { + // No-op. + } + /** * @param qryReqId Query request ID. * @param qry Query. * @param page Page. * @param allRows All rows count. + * @param last Last row. * @param rows Rows. */ - public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) { + public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, boolean last, Collection<Value[]> rows) { assert rows != null; this.qryReqId = qryReqId; this.qry = qry; this.page = page; this.allRows = allRows; + this.last = last; this.rows = rows; } @@ -80,6 +93,13 @@ public class GridNextPageResponse implements Externalizable { } /** + * @return {@code true} If this is the last page. + */ + public boolean isLast() { + return last; + } + + /** * @return Rows. */ public Collection<Value[]> rows() { @@ -91,6 +111,7 @@ public class GridNextPageResponse implements Externalizable { out.writeLong(qryReqId); out.writeInt(qry); out.writeInt(page); + out.writeBoolean(last); out.writeInt(allRows); out.writeInt(rows.size()); @@ -122,6 +143,7 @@ public class GridNextPageResponse implements Externalizable { qryReqId = in.readLong(); qry = in.readInt(); page = in.readInt(); + last = in.readBoolean(); allRows = in.readInt(); int rowCnt = in.readInt(); @@ -146,4 +168,9 @@ public class GridNextPageResponse implements Externalizable { } } } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNextPageResponse.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index bc9dcf8..90a2d4c 100644 --- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -18,6 +18,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; +import org.gridgain.grid.kernal.processors.cache.query.*; +import org.gridgain.grid.util.typedef.*; import org.gridgain.testframework.junits.common.*; import java.util.*; @@ -88,11 +90,37 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { return cc; } + /** + * @throws Exception If failed. + */ + public void testTwoStep() throws Exception { + fillCaches(); + + GridCacheTwoStepQuery q = new GridCacheTwoStepQuery("select * from _cnts_"); + + q.addMapQuery("_cnts_", "select count(*) cnt from \"partitioned\".FactPurchase"); + + GridCacheQueriesEx<Integer, FactPurchase> qx = + (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, FactPurchase>cache("partitioned").queries(); + + for (List<?> row : qx.execute(q).get()) + X.println("__ " + row); + + + +// Object cnt = .next().get(0); +// +// assertEquals(10L, cnt); + } + /** @throws Exception If failed. */ public void testOnProjection() throws Exception { + fillCaches(); + GridCacheProjection<Integer, FactPurchase> prj = ignite.<Integer, FactPurchase>cache("partitioned").projection( new IgnitePredicate<GridCacheEntry<Integer, FactPurchase>>() { - @Override public boolean apply(GridCacheEntry<Integer, FactPurchase> e) { + @Override + public boolean apply(GridCacheEntry<Integer, FactPurchase> e) { return e.getKey() > 12; } }); @@ -105,7 +133,9 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { /** * @throws IgniteCheckedException If failed. */ - private void fillCaches() throws IgniteCheckedException { + private void fillCaches() throws IgniteCheckedException, InterruptedException { + awaitPartitionMapExchange(); + int idGen = 0; GridCache<Integer, Object> dimCache = ignite.cache("replicated");