http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadResponse.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadResponse.java
deleted file mode 100644
index 0c863a9..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadResponse.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.dataload;
-
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.nio.*;
-
-/**
- *
- */
-public class GridDataLoadResponse extends GridTcpCommunicationMessageAdapter {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long reqId;
-
-    /** */
-    private byte[] errBytes;
-
-    /** */
-    private boolean forceLocDep;
-
-    /**
-     * @param reqId Request ID.
-     * @param errBytes Error bytes.
-     * @param forceLocDep Force local deployment.
-     */
-    public GridDataLoadResponse(long reqId, byte[] errBytes, boolean 
forceLocDep) {
-        this.reqId = reqId;
-        this.errBytes = errBytes;
-        this.forceLocDep = forceLocDep;
-    }
-
-    /**
-     * {@code Externalizable} support.
-     */
-    public GridDataLoadResponse() {
-        // No-op.
-    }
-
-    /**
-     * @return Request ID.
-     */
-    public long requestId() {
-        return reqId;
-    }
-
-    /**
-     * @return Error bytes.
-     */
-    public byte[] errorBytes() {
-        return errBytes;
-    }
-
-    /**
-     * @return {@code True} to force local deployment.
-     */
-    public boolean forceLocalDeployment() {
-        return forceLocDep;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDataLoadResponse.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridDataLoadResponse _clone = new GridDataLoadResponse();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        GridDataLoadResponse _clone = (GridDataLoadResponse)_msg;
-
-        _clone.reqId = reqId;
-        _clone.errBytes = errBytes;
-        _clone.forceLocDep = forceLocDep;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 0:
-                if (!commState.putByteArray(errBytes))
-                    return false;
-
-                commState.idx++;
-
-            case 1:
-                if (!commState.putBoolean(forceLocDep))
-                    return false;
-
-                commState.idx++;
-
-            case 2:
-                if (!commState.putLong(reqId))
-                    return false;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        switch (commState.idx) {
-            case 0:
-                byte[] errBytes0 = commState.getByteArray();
-
-                if (errBytes0 == BYTE_ARR_NOT_READ)
-                    return false;
-
-                errBytes = errBytes0;
-
-                commState.idx++;
-
-            case 1:
-                if (buf.remaining() < 1)
-                    return false;
-
-                forceLocDep = commState.getBoolean();
-
-                commState.idx++;
-
-            case 2:
-                if (buf.remaining() < 8)
-                    return false;
-
-                reqId = commState.getLong();
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 62;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java
deleted file mode 100644
index 799a194..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.dataload.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Job to put entries to cache on affinity node.
- */
-class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
-    /** */
-    private final GridKernalContext ctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** Cache name. */
-    private final String cacheName;
-
-    /** Entries to put. */
-    private final Collection<Map.Entry<K, V>> col;
-
-    /** {@code True} to ignore deployment ownership. */
-    private final boolean ignoreDepOwnership;
-
-    /** */
-    private final boolean skipStore;
-
-    /** */
-    private final IgniteDataLoadCacheUpdater<K, V> updater;
-
-    /**
-     * @param ctx Context.
-     * @param log Log.
-     * @param cacheName Cache name.
-     * @param col Entries to put.
-     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
-     * @param updater Updater.
-     */
-    GridDataLoadUpdateJob(
-        GridKernalContext ctx,
-        IgniteLogger log,
-        @Nullable String cacheName,
-        Collection<Map.Entry<K, V>> col,
-        boolean ignoreDepOwnership,
-        boolean skipStore,
-        IgniteDataLoadCacheUpdater<K, V> updater) {
-        this.ctx = ctx;
-        this.log = log;
-
-        assert col != null && !col.isEmpty();
-        assert updater != null;
-
-        this.cacheName = cacheName;
-        this.col = col;
-        this.ignoreDepOwnership = ignoreDepOwnership;
-        this.skipStore = skipStore;
-        this.updater = updater;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object call() throws Exception {
-        if (log.isDebugEnabled())
-            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", 
size=" + col.size() + ']');
-
-//        TODO IGNITE-77: restore adapter usage.
-//        GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
-//
-//        IgniteFuture<?> f = cache.context().preloader().startFuture();
-//
-//        if (!f.isDone())
-//            f.get();
-//
-//        if (ignoreDepOwnership)
-//            cache.context().deploy().ignoreOwnership(true);
-
-        IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
-
-        if (skipStore)
-            cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
-
-        if (ignoreDepOwnership)
-            cache.context().deploy().ignoreOwnership(true);
-
-        try {
-            updater.update(cache, col);
-
-            return null;
-        }
-        finally {
-            if (ignoreDepOwnership)
-                cache.context().deploy().ignoreOwnership(false);
-
-            if (log.isDebugEnabled())
-                log.debug("Update job finished on node: " + ctx.localNodeId());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderFuture.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderFuture.java
deleted file mode 100644
index 58cb726..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderFuture.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-
-import java.io.*;
-
-/**
- * Data loader future.
- */
-class GridDataLoaderFuture extends GridFutureAdapter<Object> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Data loader. */
-    @GridToStringExclude
-    private IgniteDataLoader dataLdr;
-
-    /**
-     * Default constructor for {@link Externalizable} support.
-     */
-    public GridDataLoaderFuture() {
-        // No-op.
-    }
-
-    /**
-     * @param ctx Context.
-     * @param dataLdr Data loader.
-     */
-    GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoader dataLdr) {
-        super(ctx);
-
-        assert dataLdr != null;
-
-        this.dataLdr = dataLdr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean cancel() throws IgniteCheckedException {
-        checkValid();
-
-        if (onCancelled()) {
-            dataLdr.close(true);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDataLoaderFuture.class, this, super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java
deleted file mode 100644
index 5c5e0cb..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.dataload.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.thread.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- *
- */
-public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
-    /** Loaders map (access is not supposed to be highly concurrent). */
-    private Collection<IgniteDataLoaderImpl> ldrs = new 
GridConcurrentHashSet<>();
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
-    /** Flushing thread. */
-    private Thread flusher;
-
-    /** */
-    private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ = new 
DelayQueue<>();
-
-    /** Marshaller. */
-    private final IgniteMarshaller marsh;
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public GridDataLoaderProcessor(GridKernalContext ctx) {
-        super(ctx);
-
-        ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
-                assert msg instanceof GridDataLoadRequest;
-
-                processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
-            }
-        });
-
-        marsh = ctx.config().getMarshaller();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
-
-        flusher = new IgniteThread(new GridWorker(ctx.gridName(), 
"grid-data-loader-flusher", log) {
-            @Override protected void body() throws InterruptedException, 
IgniteInterruptedException {
-                while (!isCancelled()) {
-                    IgniteDataLoaderImpl<K, V> ldr = flushQ.take();
-
-                    if (!busyLock.enterBusy())
-                        return;
-
-                    try {
-                        if (ldr.isClosed())
-                            continue;
-
-                        ldr.tryFlush();
-
-                        flushQ.offer(ldr);
-                    }
-                    finally {
-                        busyLock.leaveBusy();
-                    }
-                }
-            }
-        });
-
-        flusher.start();
-
-        if (log.isDebugEnabled())
-            log.debug("Started data loader processor.");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        if (ctx.config().isDaemon())
-            return;
-
-        ctx.io().removeMessageListener(TOPIC_DATALOAD);
-
-        busyLock.block();
-
-        U.interrupt(flusher);
-        U.join(flusher, log);
-
-        for (IgniteDataLoader<?, ?> ldr : ldrs) {
-            if (log.isDebugEnabled())
-                log.debug("Closing active data loader on grid stop [ldr=" + 
ldr + ", cancel=" + cancel + ']');
-
-            try {
-                ldr.close(cancel);
-            }
-            catch (IgniteInterruptedException e) {
-                U.warn(log, "Interrupted while waiting for completion of the 
data loader: " + ldr, e);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to close data loader: " + ldr, e);
-            }
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Stopped data loader processor.");
-    }
-
-    /**
-     * @param cacheName Cache name ({@code null} for default cache).
-     * @param compact {@code true} if data loader should transfer data in 
compact format.
-     * @return Data loader.
-     */
-    public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName, 
boolean compact) {
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to create data loader 
(grid is stopping).");
-
-        try {
-            final IgniteDataLoaderImpl<K, V> ldr = new 
IgniteDataLoaderImpl<>(ctx, cacheName, flushQ, compact);
-
-            ldrs.add(ldr);
-
-            ldr.future().listenAsync(new CI1<IgniteFuture<?>>() {
-                @Override public void apply(IgniteFuture<?> f) {
-                    boolean b = ldrs.remove(ldr);
-
-                    assert b : "Loader has not been added to set: " + ldr;
-
-                    if (log.isDebugEnabled())
-                        log.debug("Loader has been completed: " + ldr);
-                }
-            });
-
-            return ldr;
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
-     * @param cacheName Cache name ({@code null} for default cache).
-     * @return Data loader.
-     */
-    public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
-        return dataLoader(cacheName, true);
-    }
-
-    /**
-     * @param nodeId Sender ID.
-     * @param req Request.
-     */
-    private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
-        if (!busyLock.enterBusy()) {
-            if (log.isDebugEnabled())
-                log.debug("Ignoring data load request (node is stopping): " + 
req);
-
-            return;
-        }
-
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Processing data load request: " + req);
-
-            Object topic;
-
-            try {
-                topic = marsh.unmarshal(req.responseTopicBytes(), null);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal topic from request: " + req, 
e);
-
-                return;
-            }
-
-            ClassLoader clsLdr;
-
-            if (req.forceLocalDeployment())
-                clsLdr = U.gridClassLoader();
-            else {
-                GridDeployment dep = ctx.deploy().getGlobalDeployment(
-                    req.deploymentMode(),
-                    req.sampleClassName(),
-                    req.sampleClassName(),
-                    req.userVersion(),
-                    nodeId,
-                    req.classLoaderId(),
-                    req.participants(),
-                    null);
-
-                if (dep == null) {
-                    sendResponse(nodeId,
-                        topic,
-                        req.requestId(),
-                        new IgniteCheckedException("Failed to get deployment 
for request [sndId=" + nodeId +
-                            ", req=" + req + ']'),
-                        false);
-
-                    return;
-                }
-
-                clsLdr = dep.classLoader();
-            }
-
-            Collection<Map.Entry<K, V>> col;
-            IgniteDataLoadCacheUpdater<K, V> updater;
-
-            try {
-                col = marsh.unmarshal(req.collectionBytes(), clsLdr);
-                updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + 
", req=" + req + ']', e);
-
-                sendResponse(nodeId, topic, req.requestId(), e, false);
-
-                return;
-            }
-
-            GridDataLoadUpdateJob<K, V> job = new GridDataLoadUpdateJob<>(ctx,
-                log,
-                req.cacheName(),
-                col,
-                req.ignoreDeploymentOwnership(),
-                req.skipStore(),
-                updater);
-
-            Exception err = null;
-
-            try {
-                job.call();
-            }
-            catch (Exception e) {
-                U.error(log, "Failed to finish update job.", e);
-
-                err = e;
-            }
-
-            sendResponse(nodeId, topic, req.requestId(), err, 
req.forceLocalDeployment());
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param resTopic Response topic.
-     * @param reqId Request ID.
-     * @param err Error.
-     * @param forceLocDep Force local deployment.
-     */
-    private void sendResponse(UUID nodeId, Object resTopic, long reqId, 
@Nullable Throwable err,
-        boolean forceLocDep) {
-        byte[] errBytes;
-
-        try {
-            errBytes = err != null ? marsh.marshal(err) : null;
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to marshal message.", e);
-
-            return;
-        }
-
-        GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, 
forceLocDep);
-
-        try {
-            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
-        }
-        catch (IgniteCheckedException e) {
-            if (ctx.discovery().alive(nodeId))
-                U.error(log, "Failed to respond to node [nodeId=" + nodeId + 
", res=" + res + ']', e);
-            else if (log.isDebugEnabled())
-                log.debug("Node has left the grid: " + nodeId);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void printMemoryStats() {
-        X.println(">>>");
-        X.println(">>> Data loader processor memory stats [grid=" + 
ctx.gridName() + ']');
-        X.println(">>>   ldrsSize: " + ldrs.size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java
deleted file mode 100644
index 9f849db..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java
+++ /dev/null
@@ -1,1346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.dataload.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.product.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.eventstorage.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.Map.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.GridNodeAttributes.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- * Data loader implementation.
- */
-public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, 
Delayed {
-    /** */
-    public static final IgniteProductVersion COMPACT_MAP_ENTRIES_SINCE = 
IgniteProductVersion.fromString("1.0.0");
-
-    /** Cache updater. */
-    private IgniteDataLoadCacheUpdater<K, V> updater = 
GridDataLoadCacheUpdaters.individual();
-
-    /** */
-    private byte[] updaterBytes;
-
-    /** Max remap count before issuing an error. */
-    private static final int MAX_REMAP_CNT = 32;
-
-    /** Log reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
-
-    /** Cache name ({@code null} for default cache). */
-    private final String cacheName;
-
-    /** Portable enabled flag. */
-    private final boolean portableEnabled;
-
-    /**
-     *  If {@code true} then data will be transferred in compact format (only 
keys and values).
-     *  Otherwise full map entry will be transferred (this is requires by DR 
internal logic).
-     */
-    private final boolean compact;
-
-    /** Per-node buffer size. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
-
-    /** */
-    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
-
-    /** */
-    private long autoFlushFreq;
-
-    /** Mapping. */
-    @GridToStringInclude
-    private ConcurrentMap<UUID, Buffer> bufMappings = new 
ConcurrentHashMap8<>();
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Discovery listener. */
-    private final GridLocalEventListener discoLsnr;
-
-    /** Context. */
-    private final GridKernalContext ctx;
-
-    /** Communication topic for responses. */
-    private final Object topic;
-
-    /** */
-    private byte[] topicBytes;
-
-    /** {@code True} if data loader has been cancelled. */
-    private volatile boolean cancelled;
-
-    /** Active futures of this data loader. */
-    @GridToStringInclude
-    private final Collection<IgniteFuture<?>> activeFuts = new 
GridConcurrentHashSet<>();
-
-    /** Closure to remove from active futures. */
-    @GridToStringExclude
-    private final IgniteInClosure<IgniteFuture<?>> rmvActiveFut = new 
IgniteInClosure<IgniteFuture<?>>() {
-        @Override public void apply(IgniteFuture<?> t) {
-            boolean rmv = activeFuts.remove(t);
-
-            assert rmv;
-        }
-    };
-
-    /** Job peer deploy aware. */
-    private volatile GridPeerDeployAware jobPda;
-
-    /** Deployment class. */
-    private Class<?> depCls;
-
-    /** Future to track loading finish. */
-    private final GridFutureAdapter<?> fut;
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
-    /** Closed flag. */
-    private final AtomicBoolean closed = new AtomicBoolean();
-
-    /** */
-    private volatile long lastFlushTime = U.currentTimeMillis();
-
-    /** */
-    private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ;
-
-    /** */
-    private boolean skipStore;
-
-    /**
-     * @param ctx Grid kernal context.
-     * @param cacheName Cache name.
-     * @param flushQ Flush queue.
-     * @param compact If {@code true} data is transferred in compact mode 
(only keys and values).
-     *                Otherwise full map entry will be transferred (this is 
required by DR internal logic).
-     */
-    public IgniteDataLoaderImpl(
-        final GridKernalContext ctx,
-        @Nullable final String cacheName,
-        DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ,
-        boolean compact
-    ) {
-        assert ctx != null;
-
-        this.ctx = ctx;
-        this.cacheName = cacheName;
-        this.flushQ = flushQ;
-        this.compact = compact;
-
-        log = U.logger(ctx, logRef, IgniteDataLoaderImpl.class);
-
-        ClusterNode node = F.first(ctx.grid().forCache(cacheName).nodes());
-
-        if (node == null)
-            throw new IllegalStateException("Cache doesn't exist: " + 
cacheName);
-
-        Map<String, Boolean> attrPortable = 
node.attribute(ATTR_CACHE_PORTABLE);
-
-        Boolean portableEnabled0 = attrPortable == null ? null : 
attrPortable.get(CU.mask(cacheName));
-
-        portableEnabled = portableEnabled0 == null ? false : portableEnabled0;
-
-        discoLsnr = new GridLocalEventListener() {
-            @Override public void onEvent(IgniteEvent evt) {
-                assert evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT;
-
-                IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
-
-                UUID id = discoEvt.eventNode().id();
-
-                // Remap regular mappings.
-                final Buffer buf = bufMappings.remove(id);
-
-                if (buf != null) {
-                    // Only async notification is possible since
-                    // discovery thread may be trapped otherwise.
-                    ctx.closure().callLocalSafe(
-                        new Callable<Object>() {
-                            @Override public Object call() throws Exception {
-                                buf.onNodeLeft();
-
-                                return null;
-                            }
-                        },
-                        true /* system pool */
-                    );
-                }
-            }
-        };
-
-        ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, 
EVT_NODE_LEFT);
-
-        // Generate unique topic for this loader.
-        topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
-
-        ctx.io().addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
-                assert msg instanceof GridDataLoadResponse;
-
-                GridDataLoadResponse res = (GridDataLoadResponse)msg;
-
-                if (log.isDebugEnabled())
-                    log.debug("Received data load response: " + res);
-
-                Buffer buf = bufMappings.get(nodeId);
-
-                if (buf != null)
-                    buf.onResponse(res);
-
-                else if (log.isDebugEnabled())
-                    log.debug("Ignoring response since node has left [nodeId=" 
+ nodeId + ", ");
-            }
-        });
-
-        if (log.isDebugEnabled())
-            log.debug("Added response listener within topic: " + topic);
-
-        fut = new GridDataLoaderFuture(ctx, this);
-    }
-
-    /**
-     * Enters busy lock.
-     */
-    private void enterBusy() {
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Data loader has been closed.");
-    }
-
-    /**
-     * Leaves busy lock.
-     */
-    private void leaveBusy() {
-        busyLock.leaveBusy();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> future() {
-        return fut;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void deployClass(Class<?> depCls) {
-        this.depCls = depCls;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void updater(IgniteDataLoadCacheUpdater<K, V> updater) {
-        A.notNull(updater, "updater");
-
-        this.updater = updater;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isolated() {
-        return updater != GridDataLoadCacheUpdaters.individual();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void isolated(boolean isolated) throws 
IgniteCheckedException {
-        if (isolated())
-            return;
-
-        ClusterNode node = F.first(ctx.grid().forCache(cacheName).nodes());
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to get node for cache: " 
+ cacheName);
-
-        GridCacheAttributes a = U.cacheAttributes(node, cacheName);
-
-        assert a != null;
-
-        updater = a.atomicityMode() == GridCacheAtomicityMode.ATOMIC ?
-            GridDataLoadCacheUpdaters.<K, V>batched() :
-            GridDataLoadCacheUpdaters.<K, V>groupLocked();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean skipStore() {
-        return skipStore;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void skipStore(boolean skipStore) {
-        this.skipStore = skipStore;
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public String cacheName() {
-        return cacheName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int perNodeBufferSize() {
-        return bufSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void perNodeBufferSize(int bufSize) {
-        A.ensure(bufSize > 0, "bufSize > 0");
-
-        this.bufSize = bufSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int perNodeParallelLoadOperations() {
-        return parallelOps;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void perNodeParallelLoadOperations(int parallelOps) {
-        this.parallelOps = parallelOps;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long autoFlushFrequency() {
-        return autoFlushFreq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void autoFlushFrequency(long autoFlushFreq) {
-        A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
-
-        long old = this.autoFlushFreq;
-
-        if (autoFlushFreq != old) {
-            this.autoFlushFreq = autoFlushFreq;
-
-            if (autoFlushFreq != 0 && old == 0)
-                flushQ.add(this);
-            else if (autoFlushFreq == 0)
-                flushQ.remove(this);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(Map<K, V> entries) throws 
IllegalStateException {
-        A.notNull(entries, "entries");
-
-        return addData(entries.entrySet());
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, 
V>> entries) {
-        A.notEmpty(entries, "entries");
-
-        enterBusy();
-
-        try {
-            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
-
-            activeFuts.add(resFut);
-
-            resFut.listenAsync(rmvActiveFut);
-
-            Collection<K> keys = new GridConcurrentHashSet<>(entries.size(), 
1.0f, 16);
-
-            for (Map.Entry<K, V> entry : entries)
-                keys.add(entry.getKey());
-
-            load0(entries, resFut, keys, 0);
-
-            return resFut;
-        }
-        catch (IgniteException e) {
-            return new GridFinishedFuture<>(ctx, e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws 
IgniteCheckedException, IllegalStateException {
-        A.notNull(entry, "entry");
-
-        return addData(F.asList(entry));
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(K key, V val) throws 
IgniteCheckedException, IllegalStateException {
-        A.notNull(key, "key");
-
-        return addData(new Entry0<>(key, val));
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> removeData(K key) throws 
IgniteCheckedException, IllegalStateException {
-        return addData(key, null);
-    }
-
-    /**
-     * @param entries Entries.
-     * @param resFut Result future.
-     * @param activeKeys Active keys.
-     * @param remaps Remaps count.
-     */
-    private void load0(
-        Collection<? extends Map.Entry<K, V>> entries,
-        final GridFutureAdapter<Object> resFut,
-        final Collection<K> activeKeys,
-        final int remaps
-    ) {
-        assert entries != null;
-
-        if (remaps >= MAX_REMAP_CNT) {
-            resFut.onDone(new IgniteCheckedException("Failed to finish 
operation (too many remaps): " + remaps));
-
-            return;
-        }
-
-        Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new 
HashMap<>();
-
-        boolean initPda = ctx.deploy().enabled() && jobPda == null;
-
-        for (Map.Entry<K, V> entry : entries) {
-            ClusterNode node;
-
-            try {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                if (initPda) {
-                    jobPda = new DataLoaderPda(key, entry.getValue(), updater);
-
-                    initPda = false;
-                }
-
-                node = ctx.affinity().mapKeyToNode(cacheName, key);
-            }
-            catch (IgniteCheckedException e) {
-                resFut.onDone(e);
-
-                return;
-            }
-
-            if (node == null) {
-                resFut.onDone(new ClusterTopologyException("Failed to map key 
to node " +
-                    "(no nodes with cache found in topology) [infos=" + 
entries.size() +
-                    ", cacheName=" + cacheName + ']'));
-
-                return;
-            }
-
-            Collection<Map.Entry<K, V>> col = mappings.get(node);
-
-            if (col == null)
-                mappings.put(node, col = new ArrayList<>());
-
-            col.add(entry);
-        }
-
-        for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : 
mappings.entrySet()) {
-            final UUID nodeId = e.getKey().id();
-
-            Buffer buf = bufMappings.get(nodeId);
-
-            if (buf == null) {
-                Buffer old = bufMappings.putIfAbsent(nodeId, buf = new 
Buffer(e.getKey()));
-
-                if (old != null)
-                    buf = old;
-            }
-
-            final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
-
-            IgniteInClosure<IgniteFuture<?>> lsnr = new 
IgniteInClosure<IgniteFuture<?>>() {
-                @Override public void apply(IgniteFuture<?> t) {
-                    try {
-                        t.get();
-
-                        for (Map.Entry<K, V> e : entriesForNode)
-                            activeKeys.remove(e.getKey());
-
-                        if (activeKeys.isEmpty())
-                            resFut.onDone();
-                    }
-                    catch (IgniteCheckedException e1) {
-                        if (log.isDebugEnabled())
-                            log.debug("Future finished with error [nodeId=" + 
nodeId + ", err=" + e1 + ']');
-
-                        if (cancelled) {
-                            resFut.onDone(new IgniteCheckedException("Data 
loader has been cancelled: " +
-                                IgniteDataLoaderImpl.this, e1));
-                        }
-                        else
-                            load0(entriesForNode, resFut, activeKeys, remaps + 
1);
-                    }
-                }
-            };
-
-            GridFutureAdapter<?> f;
-
-            try {
-                f = buf.update(entriesForNode, lsnr);
-            }
-            catch (IgniteInterruptedException e1) {
-                resFut.onDone(e1);
-
-                return;
-            }
-
-            if (ctx.discovery().node(nodeId) == null) {
-                if (bufMappings.remove(nodeId, buf))
-                    buf.onNodeLeft();
-
-                if (f != null)
-                    f.onDone(new ClusterTopologyException("Failed to wait for 
request completion " +
-                        "(node has left): " + nodeId));
-            }
-        }
-    }
-
-    /**
-     * Performs flush.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    private void doFlush() throws IgniteCheckedException {
-        lastFlushTime = U.currentTimeMillis();
-
-        List<IgniteFuture> activeFuts0 = null;
-
-        int doneCnt = 0;
-
-        for (IgniteFuture<?> f : activeFuts) {
-            if (!f.isDone()) {
-                if (activeFuts0 == null)
-                    activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 
1.2));
-
-                activeFuts0.add(f);
-            }
-            else {
-                f.get();
-
-                doneCnt++;
-            }
-        }
-
-        if (activeFuts0 == null || activeFuts0.isEmpty())
-            return;
-
-        while (true) {
-            Queue<IgniteFuture<?>> q = null;
-
-            for (Buffer buf : bufMappings.values()) {
-                IgniteFuture<?> flushFut = buf.flush();
-
-                if (flushFut != null) {
-                    if (q == null)
-                        q = new ArrayDeque<>(bufMappings.size() * 2);
-
-                    q.add(flushFut);
-                }
-            }
-
-            if (q != null) {
-                assert !q.isEmpty();
-
-                boolean err = false;
-
-                for (IgniteFuture fut = q.poll(); fut != null; fut = q.poll()) 
{
-                    try {
-                        fut.get();
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to flush buffer: " + e);
-
-                        err = true;
-                    }
-                }
-
-                if (err)
-                    // Remaps needed - flush buffers.
-                    continue;
-            }
-
-            doneCnt = 0;
-
-            for (int i = 0; i < activeFuts0.size(); i++) {
-                IgniteFuture f = activeFuts0.get(i);
-
-                if (f == null)
-                    doneCnt++;
-                else if (f.isDone()) {
-                    f.get();
-
-                    doneCnt++;
-
-                    activeFuts0.set(i, null);
-                }
-                else
-                    break;
-            }
-
-            if (doneCnt == activeFuts0.size())
-                return;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Override public void flush() throws IgniteCheckedException {
-        enterBusy();
-
-        try {
-            doFlush();
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Flushes every internal buffer if buffer was flushed before passed in
-     * threshold.
-     * <p>
-     * Does not wait for result and does not fail on errors assuming that this 
method
-     * should be called periodically.
-     */
-    @Override public void tryFlush() throws IgniteInterruptedException {
-        if (!busyLock.enterBusy())
-            return;
-
-        try {
-            for (Buffer buf : bufMappings.values())
-                buf.flush();
-
-            lastFlushTime = U.currentTimeMillis();
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param cancel {@code True} to close with cancellation.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Override public void close(boolean cancel) throws IgniteCheckedException {
-        if (!closed.compareAndSet(false, true))
-            return;
-
-        busyLock.block();
-
-        if (log.isDebugEnabled())
-            log.debug("Closing data loader [ldr=" + this + ", cancel=" + 
cancel + ']');
-
-        IgniteCheckedException e = null;
-
-        try {
-            // Assuming that no methods are called on this loader after this 
method is called.
-            if (cancel) {
-                cancelled = true;
-
-                for (Buffer buf : bufMappings.values())
-                    buf.cancelAll();
-            }
-            else
-                doFlush();
-
-            ctx.event().removeLocalEventListener(discoLsnr);
-
-            ctx.io().removeMessageListener(topic);
-        }
-        catch (IgniteCheckedException e0) {
-            e = e0;
-        }
-
-        fut.onDone(null, e);
-
-        if (e != null)
-            throw e;
-    }
-
-    /**
-     * @return {@code true} If the loader is closed.
-     */
-    boolean isClosed() {
-        return fut.isDone();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
-        close(false);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteDataLoaderImpl.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getDelay(TimeUnit unit) {
-        return unit.convert(nextFlushTime() - U.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * @return Next flush time.
-     */
-    private long nextFlushTime() {
-        return lastFlushTime + autoFlushFreq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int compareTo(Delayed o) {
-        return nextFlushTime() > ((IgniteDataLoaderImpl)o).nextFlushTime() ? 1 
: -1;
-    }
-
-    /**
-     *
-     */
-    private class Buffer {
-        /** Node. */
-        private final ClusterNode node;
-
-        /** Active futures. */
-        private final Collection<IgniteFuture<Object>> locFuts;
-
-        /** Buffered entries. */
-        private List<Map.Entry<K, V>> entries;
-
-        /** */
-        @GridToStringExclude
-        private GridFutureAdapter<Object> curFut;
-
-        /** Local node flag. */
-        private final boolean isLocNode;
-
-        /** ID generator. */
-        private final AtomicLong idGen = new AtomicLong();
-
-        /** Active futures. */
-        private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
-
-        /** */
-        private final Semaphore sem;
-
-        /** Closure to signal on task finish. */
-        @GridToStringExclude
-        private final IgniteInClosure<IgniteFuture<Object>> signalC = new 
IgniteInClosure<IgniteFuture<Object>>() {
-            @Override public void apply(IgniteFuture<Object> t) {
-                signalTaskFinished(t);
-            }
-        };
-
-        /**
-         * @param node Node.
-         */
-        Buffer(ClusterNode node) {
-            assert node != null;
-
-            this.node = node;
-
-            locFuts = new GridConcurrentHashSet<>();
-            reqs = new ConcurrentHashMap8<>();
-
-            // Cache local node flag.
-            isLocNode = node.equals(ctx.discovery().localNode());
-
-            entries = newEntries();
-            curFut = new GridFutureAdapter<>(ctx);
-            curFut.listenAsync(signalC);
-
-            sem = new Semaphore(parallelOps);
-        }
-
-        /**
-         * @param newEntries Infos.
-         * @param lsnr Listener for the operation future.
-         * @throws org.apache.ignite.IgniteInterruptedException If failed.
-         * @return Future for operation.
-         */
-        @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> 
newEntries,
-            IgniteInClosure<IgniteFuture<?>> lsnr) throws 
IgniteInterruptedException {
-            List<Map.Entry<K, V>> entries0 = null;
-            GridFutureAdapter<Object> curFut0;
-
-            synchronized (this) {
-                curFut0 = curFut;
-
-                curFut0.listenAsync(lsnr);
-
-                for (Map.Entry<K, V> entry : newEntries)
-                    entries.add(entry);
-
-                if (entries.size() >= bufSize) {
-                    entries0 = entries;
-
-                    entries = newEntries();
-                    curFut = new GridFutureAdapter<>(ctx);
-                    curFut.listenAsync(signalC);
-                }
-            }
-
-            if (entries0 != null) {
-                submit(entries0, curFut0);
-
-                if (cancelled)
-                    curFut0.onDone(new IgniteCheckedException("Data loader has 
been cancelled: " + IgniteDataLoaderImpl.this));
-            }
-
-            return curFut0;
-        }
-
-        /**
-         * @return Fresh collection with some space for outgrowth.
-         */
-        private List<Map.Entry<K, V>> newEntries() {
-            return new ArrayList<>((int)(bufSize * 1.2));
-        }
-
-        /**
-         * @return Future if any submitted.
-         *
-         * @throws org.apache.ignite.IgniteInterruptedException If thread has 
been interrupted.
-         */
-        @Nullable
-        IgniteFuture<?> flush() throws IgniteInterruptedException {
-            List<Map.Entry<K, V>> entries0 = null;
-            GridFutureAdapter<Object> curFut0 = null;
-
-            synchronized (this) {
-                if (!entries.isEmpty()) {
-                    entries0 = entries;
-                    curFut0 = curFut;
-
-                    entries = newEntries();
-                    curFut = new GridFutureAdapter<>(ctx);
-                    curFut.listenAsync(signalC);
-                }
-            }
-
-            if (entries0 != null)
-                submit(entries0, curFut0);
-
-            // Create compound future for this flush.
-            GridCompoundFuture<Object, Object> res = null;
-
-            for (IgniteFuture<Object> f : locFuts) {
-                if (res == null)
-                    res = new GridCompoundFuture<>(ctx);
-
-                res.add(f);
-            }
-
-            for (IgniteFuture<Object> f : reqs.values()) {
-                if (res == null)
-                    res = new GridCompoundFuture<>(ctx);
-
-                res.add(f);
-            }
-
-            if (res != null)
-                res.markInitialized();
-
-            return res;
-        }
-
-        /**
-         * Increments active tasks count.
-         *
-         * @throws org.apache.ignite.IgniteInterruptedException If thread has 
been interrupted.
-         */
-        private void incrementActiveTasks() throws IgniteInterruptedException {
-            U.acquire(sem);
-        }
-
-        /**
-         * @param f Future that finished.
-         */
-        private void signalTaskFinished(IgniteFuture<Object> f) {
-            assert f != null;
-
-            sem.release();
-        }
-
-        /**
-         * @param entries Entries to submit.
-         * @param curFut Current future.
-         * @throws org.apache.ignite.IgniteInterruptedException If interrupted.
-         */
-        private void submit(final Collection<Map.Entry<K, V>> entries, final 
GridFutureAdapter<Object> curFut)
-            throws IgniteInterruptedException {
-            assert entries != null;
-            assert !entries.isEmpty();
-            assert curFut != null;
-
-            incrementActiveTasks();
-
-            IgniteFuture<Object> fut;
-
-            if (isLocNode) {
-                fut = ctx.closure().callLocalSafe(
-                    new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, 
false, skipStore, updater), false);
-
-                locFuts.add(fut);
-
-                fut.listenAsync(new IgniteInClosure<IgniteFuture<Object>>() {
-                    @Override public void apply(IgniteFuture<Object> t) {
-                        try {
-                            boolean rmv = locFuts.remove(t);
-
-                            assert rmv;
-
-                            curFut.onDone(t.get());
-                        }
-                        catch (IgniteCheckedException e) {
-                            curFut.onDone(e);
-                        }
-                    }
-                });
-            }
-            else {
-                byte[] entriesBytes;
-
-                try {
-                    if (compact) {
-                        if 
(node.version().compareTo(COMPACT_MAP_ENTRIES_SINCE) < 0) {
-                            Collection<Map.Entry<K, V>> entries0 = new 
ArrayList<>(entries.size());
-
-                            GridPortableProcessor portable = ctx.portable();
-
-                            for (Map.Entry<K, V> entry : entries)
-                                entries0.add(new Entry0<>(
-                                    portableEnabled ? 
(K)portable.marshalToPortable(entry.getKey()) : entry.getKey(),
-                                    portableEnabled ? 
(V)portable.marshalToPortable(entry.getValue()) : entry.getValue()));
-
-                            entriesBytes = 
ctx.config().getMarshaller().marshal(entries0);
-                        }
-                        else
-                            entriesBytes = ctx.config().getMarshaller()
-                                .marshal(new Entries0<>(entries, 
portableEnabled ? ctx.portable() : null));
-                    }
-                    else
-                        entriesBytes = 
ctx.config().getMarshaller().marshal(entries);
-
-                    if (updaterBytes == null) {
-                        assert updater != null;
-
-                        updaterBytes = 
ctx.config().getMarshaller().marshal(updater);
-                    }
-
-                    if (topicBytes == null)
-                        topicBytes = 
ctx.config().getMarshaller().marshal(topic);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to marshal (request will not be 
sent).", e);
-
-                    return;
-                }
-
-                GridDeployment dep = null;
-                GridPeerDeployAware jobPda0 = null;
-
-                if (ctx.deploy().enabled()) {
-                    try {
-                        jobPda0 = jobPda;
-
-                        assert jobPda0 != null;
-
-                        dep = ctx.deploy().deploy(jobPda0.deployClass(), 
jobPda0.classLoader());
-
-                        GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
-
-                        if (cache != null)
-                            cache.context().deploy().onEnter();
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to deploy class (request will not 
be sent): " + jobPda0.deployClass(), e);
-
-                        return;
-                    }
-
-                    if (dep == null)
-                        U.warn(log, "Failed to deploy class (request will be 
sent): " + jobPda0.deployClass());
-                }
-
-                long reqId = idGen.incrementAndGet();
-
-                fut = curFut;
-
-                reqs.put(reqId, (GridFutureAdapter<Object>)fut);
-
-                GridDataLoadRequest req = new GridDataLoadRequest(
-                    reqId,
-                    topicBytes,
-                    cacheName,
-                    updaterBytes,
-                    entriesBytes,
-                    true,
-                    skipStore,
-                    dep != null ? dep.deployMode() : null,
-                    dep != null ? jobPda0.deployClass().getName() : null,
-                    dep != null ? dep.userVersion() : null,
-                    dep != null ? dep.participants() : null,
-                    dep != null ? dep.classLoaderId() : null,
-                    dep == null);
-
-                try {
-                    ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Sent request to node [nodeId=" + node.id() 
+ ", req=" + req + ']');
-                }
-                catch (IgniteCheckedException e) {
-                    if (ctx.discovery().alive(node) && 
ctx.discovery().pingNode(node.id()))
-                        ((GridFutureAdapter<Object>)fut).onDone(e);
-                    else
-                        ((GridFutureAdapter<Object>)fut).onDone(new 
ClusterTopologyException("Failed to send " +
-                            "request (node has left): " + node.id()));
-                }
-            }
-        }
-
-        /**
-         *
-         */
-        void onNodeLeft() {
-            assert !isLocNode;
-            assert bufMappings.get(node.id()) != this;
-
-            if (log.isDebugEnabled())
-                log.debug("Forcibly completing futures (node has left): " + 
node.id());
-
-            Exception e = new ClusterTopologyException("Failed to wait for 
request completion " +
-                "(node has left): " + node.id());
-
-            for (GridFutureAdapter<Object> f : reqs.values())
-                f.onDone(e);
-
-            // Make sure to complete current future.
-            GridFutureAdapter<Object> curFut0;
-
-            synchronized (this) {
-                curFut0 = curFut;
-            }
-
-            curFut0.onDone(e);
-        }
-
-        /**
-         * @param res Response.
-         */
-        void onResponse(GridDataLoadResponse res) {
-            if (log.isDebugEnabled())
-                log.debug("Received data load response: " + res);
-
-            GridFutureAdapter<?> f = reqs.remove(res.requestId());
-
-            if (f == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Future for request has not been found: " + 
res.requestId());
-
-                return;
-            }
-
-            Throwable err = null;
-
-            byte[] errBytes = res.errorBytes();
-
-            if (errBytes != null) {
-                try {
-                    GridPeerDeployAware jobPda0 = jobPda;
-
-                    err = ctx.config().getMarshaller().unmarshal(
-                        errBytes,
-                        jobPda0 != null ? jobPda0.classLoader() : 
U.gridClassLoader());
-                }
-                catch (IgniteCheckedException e) {
-                    f.onDone(null, new IgniteCheckedException("Failed to 
unmarshal response.", e));
-
-                    return;
-                }
-            }
-
-            f.onDone(null, err);
-
-            if (log.isDebugEnabled())
-                log.debug("Finished future [fut=" + f + ", reqId=" + 
res.requestId() + ", err=" + err + ']');
-        }
-
-        /**
-         *
-         */
-        void cancelAll() {
-            IgniteCheckedException err = new IgniteCheckedException("Data 
loader has been cancelled: " + IgniteDataLoaderImpl.this);
-
-            for (IgniteFuture<?> f : locFuts) {
-                try {
-                    f.cancel();
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to cancel mini-future.", e);
-                }
-            }
-
-            for (GridFutureAdapter<?> f : reqs.values())
-                f.onDone(err);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            int size;
-
-            synchronized (this) {
-                size = entries.size();
-            }
-
-            return S.toString(Buffer.class, this,
-                "entriesCnt", size,
-                "locFutsSize", locFuts.size(),
-                "reqsSize", reqs.size());
-        }
-    }
-
-    /**
-     * Data loader peer-deploy aware.
-     */
-    private class DataLoaderPda implements GridPeerDeployAware {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Deploy class. */
-        private Class<?> cls;
-
-        /** Class loader. */
-        private ClassLoader ldr;
-
-        /** Collection of objects to detect deploy class and class loader. */
-        private Collection<Object> objs;
-
-        /**
-         * Constructs data loader peer-deploy aware.
-         *
-         * @param objs Collection of objects to detect deploy class and class 
loader.
-         */
-        private DataLoaderPda(Object... objs) {
-            this.objs = Arrays.asList(objs);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Class<?> deployClass() {
-            if (cls == null) {
-                Class<?> cls0 = null;
-
-                if (depCls != null)
-                    cls0 = depCls;
-                else {
-                    for (Iterator<Object> it = objs.iterator(); (cls0 == null 
|| U.isJdk(cls0)) && it.hasNext();) {
-                        Object o = it.next();
-
-                        if (o != null)
-                            cls0 = U.detectClass(o);
-                    }
-
-                    if (cls0 == null || U.isJdk(cls0))
-                        cls0 = IgniteDataLoaderImpl.class;
-                }
-
-                assert cls0 != null : "Failed to detect deploy class [objs=" + 
objs + ']';
-
-                cls = cls0;
-            }
-
-            return cls;
-        }
-
-        /** {@inheritDoc} */
-        @Override public ClassLoader classLoader() {
-            if (ldr == null) {
-                ClassLoader ldr0 = deployClass().getClassLoader();
-
-                // Safety.
-                if (ldr0 == null)
-                    ldr0 = U.gridClassLoader();
-
-                assert ldr0 != null : "Failed to detect classloader [objs=" + 
objs + ']';
-
-                ldr = ldr0;
-            }
-
-            return ldr;
-        }
-    }
-
-    /**
-     * Entry.
-     */
-    private static class Entry0<K, V> implements Map.Entry<K, V>, 
Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private K key;
-
-        /** */
-        private V val;
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         */
-        private Entry0(K key, @Nullable V val) {
-            assert key != null;
-
-            this.key = key;
-            this.val = val;
-        }
-
-        /**
-         * For {@link Externalizable}.
-         */
-        @SuppressWarnings("UnusedDeclaration")
-        public Entry0() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public K getKey() {
-            return key;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V getValue() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V setValue(V val) {
-            V old = this.val;
-
-            this.val = val;
-
-            return old;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeObject(key);
-            out.writeObject(val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            key = (K)in.readObject();
-            val = (V)in.readObject();
-        }
-    }
-
-    /**
-     * Wrapper list with special compact serialization of map entries.
-     */
-    private static class Entries0<K, V> extends 
AbstractCollection<Map.Entry<K, V>> implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**  Wrapped delegate. */
-        private Collection<Map.Entry<K, V>> delegate;
-
-        /** Optional portable processor for converting values. */
-        private GridPortableProcessor portable;
-
-        /**
-         * @param delegate Delegate.
-         * @param portable Portable processor.
-         */
-        private Entries0(Collection<Map.Entry<K, V>> delegate, 
GridPortableProcessor portable) {
-            this.delegate = delegate;
-            this.portable = portable;
-        }
-
-        /**
-         * For {@link Externalizable}.
-         */
-        public Entries0() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<Entry<K, V>> iterator() {
-            return delegate.iterator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return delegate.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeInt(delegate.size());
-
-            boolean portableEnabled = portable != null;
-
-            for (Map.Entry<K, V> entry : delegate) {
-                if (portableEnabled) {
-                    
out.writeObject(portable.marshalToPortable(entry.getKey()));
-                    
out.writeObject(portable.marshalToPortable(entry.getValue()));
-                }
-                else {
-                    out.writeObject(entry.getKey());
-                    out.writeObject(entry.getValue());
-                }
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            int sz = in.readInt();
-
-            delegate = new ArrayList<>(sz);
-
-            for (int i = 0; i < sz; i++) {
-                Object k = in.readObject();
-                Object v = in.readObject();
-
-                delegate.add(new Entry0<>((K)k, (V)v));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/package.html
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/package.html
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/package.html
deleted file mode 100644
index 50a90ff..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/package.html
+++ /dev/null
@@ -1,23 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 
"http://www.w3.org/TR/html4/loose.dtd";>
-<html>
-<body>
-    <!-- Package description. -->
-    Data loader processor.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
new file mode 100644
index 0000000..71ea41f
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCacheMode.*;
+
+/**
+ * Tests for {@link GridAffinityProcessor}.
+ */
+@GridCommonTest(group = "Affinity Processor")
+public abstract class GridAffinityProcessorAbstractSelfTest extends 
GridCommonAbstractTest {
+    /** Number of grids started for tests. Should not be less than 2. */
+    private static final int NODES_CNT = 3;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Flag to start grid with cache. */
+    private boolean withCache;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        if (withCache) {
+            CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+            cacheCfg.setName(CACHE_NAME);
+            cacheCfg.setCacheMode(PARTITIONED);
+            cacheCfg.setBackups(1);
+            cacheCfg.setAffinity(affinityFunction());
+
+            cfg.setCacheConfiguration(cacheCfg);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * Creates affinity function for test.
+     *
+     * @return Affinity function.
+     */
+    protected abstract GridCacheAffinityFunction affinityFunction();
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions"})
+    @Override protected void beforeTestsStarted() throws Exception {
+        assert NODES_CNT >= 1;
+
+        withCache = false;
+
+        for (int i = 0; i < NODES_CNT; i++)
+            startGrid(i);
+
+        withCache = true;
+
+        for (int i = NODES_CNT; i < 2 * NODES_CNT; i++)
+            startGrid(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Test affinity functions caching and clean up.
+     *
+     * @throws Exception In case of any exception.
+     */
+    @SuppressWarnings("AssertEqualsBetweenInconvertibleTypes")
+    public void testAffinityProcessor() throws Exception {
+        Random rnd = new Random();
+
+        final GridKernal grid1 = (GridKernal)grid(rnd.nextInt(NODES_CNT)); // 
With cache.
+        GridKernal grid2 = (GridKernal)grid(NODES_CNT + 
rnd.nextInt(NODES_CNT)); // Without cache.
+
+        assertEquals(NODES_CNT * 2, grid1.nodes().size());
+        assertEquals(NODES_CNT * 2, grid2.nodes().size());
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                grid1.cache(CACHE_NAME);
+
+                return null;
+            }
+        }, IllegalArgumentException.class, null);
+
+        GridCache<Integer, Integer> cache = grid2.cache(CACHE_NAME);
+
+        assertNotNull(cache);
+
+        GridAffinityProcessor affPrc1 = grid1.context().affinity();
+        GridAffinityProcessor affPrc2 = grid2.context().affinity();
+
+        // Create keys collection.
+        Collection<Integer> keys = new ArrayList<>(1000);
+
+        for (int i = 0; i < 1000; i++)
+            keys.add(i);
+
+        //
+        // Validate affinity functions collection updated on first call.
+        //
+
+        Map<ClusterNode, Collection<Integer>> node1Map = 
affPrc1.mapKeysToNodes(CACHE_NAME, keys);
+        Map<ClusterNode, Collection<Integer>> node2Map = 
affPrc2.mapKeysToNodes(CACHE_NAME, keys);
+        Map<ClusterNode, Collection<Integer>> cacheMap = 
cache.affinity().mapKeysToNodes(keys);
+
+        assertEquals(cacheMap.size(), node1Map.size());
+        assertEquals(cacheMap.size(), node2Map.size());
+
+        for (Map.Entry<ClusterNode, Collection<Integer>> entry : 
cacheMap.entrySet()) {
+            ClusterNode node = entry.getKey();
+
+            Collection<Integer> mappedKeys = entry.getValue();
+
+            Collection<Integer> mapped1 = node1Map.get(node);
+            Collection<Integer> mapped2 = node2Map.get(node);
+
+            assertTrue(mappedKeys.containsAll(mapped1) && 
mapped1.containsAll(mappedKeys));
+            assertTrue(mappedKeys.containsAll(mapped2) && 
mapped2.containsAll(mappedKeys));
+        }
+    }
+
+    /**
+     * Test performance of affinity processor.
+     *
+     * @throws Exception In case of any exception.
+     */
+    public void testPerformance() throws Exception {
+        GridKernal grid = (GridKernal)grid(0);
+        GridAffinityProcessor aff = grid.context().affinity();
+
+        int keysSize = 1000000;
+
+        Collection<Integer> keys = new ArrayList<>(keysSize);
+
+        for (int i = 0; i < keysSize; i++)
+            keys.add(i);
+
+        long start = System.currentTimeMillis();
+
+        int iterations = 10000000;
+
+        for (int i = 0; i < iterations; i++)
+            aff.mapKeyToNode(keys);
+
+        long diff = System.currentTimeMillis() - start;
+
+        info(">>> Map " + keysSize + " keys to " + grid.nodes().size() + " 
nodes " + iterations + " times in " + diff + "ms.");
+
+        assertTrue(diff < 25000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java
new file mode 100644
index 0000000..0f8a49e
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+
+/**
+ * Tests consistent hash affinity function.
+ */
+public class GridAffinityProcessorConsistentHashSelfTest extends 
GridAffinityProcessorAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheAffinityFunction affinityFunction() {
+        return new GridCacheConsistentHashAffinityFunction();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java
new file mode 100644
index 0000000..fc1831e
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.rendezvous.*;
+
+/**
+ * Tests affinity processor with rendezvous affinity function.
+ */
+public class GridAffinityProcessorRendezvousSelfTest extends 
GridAffinityProcessorAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheAffinityFunction affinityFunction() {
+        return new GridCacheRendezvousAffinityFunction();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 4542297..4fac368 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -21,7 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.gridgain.testframework.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorRemoteTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorRemoteTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorRemoteTest.java
new file mode 100644
index 0000000..ec27b86
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorRemoteTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+import java.util.*;
+
+/**
+ * Tests execution of anonymous closures on remote nodes.
+ */
+@GridCommonTest(group = "Closure Processor")
+public class GridClosureProcessorRemoteTest extends GridCommonAbstractTest {
+    /**
+     *
+     */
+    public GridClosureProcessorRemoteTest() {
+        super(true); // Start grid.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getTestGridName() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration() throws 
Exception {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi());
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception Thrown in case of failure.
+     */
+    public void testAnonymousBroadcast() throws Exception {
+        Ignite g = grid();
+
+        assert g.cluster().nodes().size() >= 2;
+
+        g.compute().run(new CA() {
+            @Override public void apply() {
+                System.out.println("BROADCASTING....");
+            }
+        });
+
+        Thread.sleep(2000);
+    }
+
+    /**
+     * @throws Exception Thrown in case of failure.
+     */
+    public void testAnonymousUnicast() throws Exception {
+        Ignite g = grid();
+
+        assert g.cluster().nodes().size() >= 2;
+
+        ClusterNode rmt = F.first(g.cluster().forRemotes().nodes());
+
+        compute(g.cluster().forNode(rmt)).run(new CA() {
+            @Override public void apply() {
+                System.out.println("UNICASTING....");
+            }
+        });
+
+        Thread.sleep(2000);
+    }
+
+    /**
+     *
+     * @throws Exception Thrown in case of failure.
+     */
+    public void testAnonymousUnicastRequest() throws Exception {
+        Ignite g = grid();
+
+        assert g.cluster().nodes().size() >= 2;
+
+        ClusterNode rmt = F.first(g.cluster().forRemotes().nodes());
+        final ClusterNode loc = g.cluster().localNode();
+
+        compute(g.cluster().forNode(rmt)).run(new CA() {
+            @Override public void apply() {
+                message(grid().forNode(loc)).localListen(new 
IgniteBiPredicate<UUID, String>() {
+                    @Override public boolean apply(UUID uuid, String s) {
+                        System.out.println("Received test message [nodeId: " + 
uuid + ", s=" + s + ']');
+
+                        return false;
+                    }
+                }, null);
+            }
+        });
+
+        message(g.cluster().forNode(rmt)).send(null, "TESTING...");
+
+        Thread.sleep(2000);
+    }
+}

Reply via email to