http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessor.java
new file mode 100644
index 0000000..ee12f7a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessor.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ *
+ */
+public class DataStreamerProcessor<K, V> extends GridProcessorAdapter {
+    /** Loaders map (access is not supposed to be highly concurrent). */
+    private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Flushing thread. */
+    private Thread flusher;
+
+    /** */
+    private final DelayQueue<DataStreamerImpl<K, V>> flushQ = new 
DelayQueue<>();
+
+    /** Marshaller. */
+    private final Marshaller marsh;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public DataStreamerProcessor(GridKernalContext ctx) {
+        super(ctx);
+
+        ctx.io().addMessageListener(TOPIC_DATASTREAM, new 
GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg instanceof DataStreamerRequest;
+
+                processRequest(nodeId, (DataStreamerRequest)msg);
+            }
+        });
+
+        marsh = ctx.config().getMarshaller();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.config().isDaemon())
+            return;
+
+        flusher = new IgniteThread(new GridWorker(ctx.gridName(), 
"grid-data-loader-flusher", log) {
+            @Override protected void body() throws InterruptedException {
+                while (!isCancelled()) {
+                    DataStreamerImpl<K, V> ldr = flushQ.take();
+
+                    if (!busyLock.enterBusy())
+                        return;
+
+                    try {
+                        if (ldr.isClosed())
+                            continue;
+
+                        ldr.tryFlush();
+
+                        flushQ.offer(ldr);
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            }
+        });
+
+        flusher.start();
+
+        if (log.isDebugEnabled())
+            log.debug("Started data streamer processor.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (ctx.config().isDaemon())
+            return;
+
+        ctx.io().removeMessageListener(TOPIC_DATASTREAM);
+
+        busyLock.block();
+
+        U.interrupt(flusher);
+        U.join(flusher, log);
+
+        for (DataStreamerImpl<?, ?> ldr : ldrs) {
+            if (log.isDebugEnabled())
+                log.debug("Closing active data streamer on grid stop [ldr=" + 
ldr + ", cancel=" + cancel + ']');
+
+            try {
+                ldr.closeEx(cancel);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                U.warn(log, "Interrupted while waiting for completion of the 
data streamer: " + ldr, e);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to close data streamer: " + ldr, e);
+            }
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Stopped data streamer processor.");
+    }
+
+    /**
+     * @param cacheName Cache name ({@code null} for default cache).
+     * @return Data loader.
+     */
+    public DataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to create data streamer 
(grid is stopping).");
+
+        try {
+            final DataStreamerImpl<K, V> ldr = new DataStreamerImpl<>(ctx, 
cacheName, flushQ);
+
+            ldrs.add(ldr);
+
+            ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> f) {
+                    boolean b = ldrs.remove(ldr);
+
+                    assert b : "Loader has not been added to set: " + ldr;
+
+                    if (log.isDebugEnabled())
+                        log.debug("Loader has been completed: " + ldr);
+                }
+            });
+
+            return ldr;
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param req Request.
+     */
+    private void processRequest(UUID nodeId, DataStreamerRequest req) {
+        if (!busyLock.enterBusy()) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring data load request (node is stopping): " + 
req);
+
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Processing data load request: " + req);
+
+            Object topic;
+
+            try {
+                topic = marsh.unmarshal(req.responseTopicBytes(), null);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal topic from request: " + req, 
e);
+
+                return;
+            }
+
+            ClassLoader clsLdr;
+
+            if (req.forceLocalDeployment())
+                clsLdr = U.gridClassLoader();
+            else {
+                GridDeployment dep = ctx.deploy().getGlobalDeployment(
+                    req.deploymentMode(),
+                    req.sampleClassName(),
+                    req.sampleClassName(),
+                    req.userVersion(),
+                    nodeId,
+                    req.classLoaderId(),
+                    req.participants(),
+                    null);
+
+                if (dep == null) {
+                    sendResponse(nodeId,
+                        topic,
+                        req.requestId(),
+                        new IgniteCheckedException("Failed to get deployment 
for request [sndId=" + nodeId +
+                            ", req=" + req + ']'),
+                        false);
+
+                    return;
+                }
+
+                clsLdr = dep.classLoader();
+            }
+
+            IgniteDataStreamer.Updater<K, V> updater;
+
+            try {
+                updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + 
", req=" + req + ']', e);
+
+                sendResponse(nodeId, topic, req.requestId(), e, false);
+
+                return;
+            }
+
+            Collection<DataStreamerEntry> col = req.entries();
+
+            DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
+                log,
+                req.cacheName(),
+                col,
+                req.ignoreDeploymentOwnership(),
+                req.skipStore(),
+                updater);
+
+            Exception err = null;
+
+            try {
+                job.call();
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to finish update job.", e);
+
+                err = e;
+            }
+
+            sendResponse(nodeId, topic, req.requestId(), err, 
req.forceLocalDeployment());
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param resTopic Response topic.
+     * @param reqId Request ID.
+     * @param err Error.
+     * @param forceLocDep Force local deployment.
+     */
+    private void sendResponse(UUID nodeId, Object resTopic, long reqId, 
@Nullable Throwable err,
+        boolean forceLocDep) {
+        byte[] errBytes;
+
+        try {
+            errBytes = err != null ? marsh.marshal(err) : null;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to marshal message.", e);
+
+            return;
+        }
+
+        DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, 
forceLocDep);
+
+        try {
+            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            if (ctx.discovery().alive(nodeId))
+                U.error(log, "Failed to respond to node [nodeId=" + nodeId + 
", res=" + res + ']', e);
+            else if (log.isDebugEnabled())
+                log.debug("Node has left the grid: " + nodeId);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        X.println(">>>");
+        X.println(">>> Data streamer processor memory stats [grid=" + 
ctx.gridName() + ']');
+        X.println(">>>   ldrsSize: " + ldrs.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerRequest.java
new file mode 100644
index 0000000..1f85ee6
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerRequest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class DataStreamerRequest implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] resTopicBytes;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** */
+    private byte[] updaterBytes;
+
+    /** Entries to update. */
+    @GridDirectCollection(DataStreamerEntry.class)
+    private Collection<DataStreamerEntry> entries;
+
+    /** {@code True} to ignore deployment ownership. */
+    private boolean ignoreDepOwnership;
+
+    /** */
+    private boolean skipStore;
+
+    /** */
+    private DeploymentMode depMode;
+
+    /** */
+    private String sampleClsName;
+
+    /** */
+    private String userVer;
+
+    /** Node class loader participants. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+    private Map<UUID, IgniteUuid> ldrParticipants;
+
+    /** */
+    private IgniteUuid clsLdrId;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public DataStreamerRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param resTopicBytes Response topic.
+     * @param cacheName Cache name.
+     * @param updaterBytes Cache updater.
+     * @param entries Entries to put.
+     * @param ignoreDepOwnership Ignore ownership.
+     * @param skipStore Skip store flag.
+     * @param depMode Deployment mode.
+     * @param sampleClsName Sample class name.
+     * @param userVer User version.
+     * @param ldrParticipants Loader participants.
+     * @param clsLdrId Class loader ID.
+     * @param forceLocDep Force local deployment.
+     */
+    public DataStreamerRequest(long reqId,
+        byte[] resTopicBytes,
+        @Nullable String cacheName,
+        byte[] updaterBytes,
+        Collection<DataStreamerEntry> entries,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        DeploymentMode depMode,
+        String sampleClsName,
+        String userVer,
+        Map<UUID, IgniteUuid> ldrParticipants,
+        IgniteUuid clsLdrId,
+        boolean forceLocDep) {
+        this.reqId = reqId;
+        this.resTopicBytes = resTopicBytes;
+        this.cacheName = cacheName;
+        this.updaterBytes = updaterBytes;
+        this.entries = entries;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.depMode = depMode;
+        this.sampleClsName = sampleClsName;
+        this.userVer = userVer;
+        this.ldrParticipants = ldrParticipants;
+        this.clsLdrId = clsLdrId;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Response topic.
+     */
+    public byte[] responseTopicBytes() {
+        return resTopicBytes;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * @return Updater.
+     */
+    public byte[] updaterBytes() {
+        return updaterBytes;
+    }
+
+    /**
+     * @return Entries to update.
+     */
+    public Collection<DataStreamerEntry> entries() {
+        return entries;
+    }
+
+    /**
+     * @return {@code True} to ignore ownership.
+     */
+    public boolean ignoreDeploymentOwnership() {
+        return ignoreDepOwnership;
+    }
+
+    /**
+     * @return Skip store flag.
+     */
+    public boolean skipStore() {
+        return skipStore;
+    }
+
+    /**
+     * @return Deployment mode.
+     */
+    public DeploymentMode deploymentMode() {
+        return depMode;
+    }
+
+    /**
+     * @return Sample class name.
+     */
+    public String sampleClassName() {
+        return sampleClsName;
+    }
+
+    /**
+     * @return User version.
+     */
+    public String userVersion() {
+        return userVer;
+    }
+
+    /**
+     * @return Participants.
+     */
+    public Map<UUID, IgniteUuid> participants() {
+        return ldrParticipants;
+    }
+
+    /**
+     * @return Class loader ID.
+     */
+    public IgniteUuid classLoaderId() {
+        return clsLdrId;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataStreamerRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeString("cacheName", cacheName))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeByte("depMode", depMode != null ? 
(byte)depMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeCollection("entries", entries, 
MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeBoolean("forceLocDep", forceLocDep))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeBoolean("ignoreDepOwnership", 
ignoreDepOwnership))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeMap("ldrParticipants", ldrParticipants, 
MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeString("sampleClsName", sampleClsName))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeBoolean("skipStore", skipStore))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeString("userVer", userVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cacheName = reader.readString("cacheName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                clsLdrId = reader.readIgniteUuid("clsLdrId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                byte depModeOrd;
+
+                depModeOrd = reader.readByte("depMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                depMode = DeploymentMode.fromOrdinal(depModeOrd);
+
+                reader.incrementState();
+
+            case 3:
+                entries = reader.readCollection("entries", 
MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                forceLocDep = reader.readBoolean("forceLocDep");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                ldrParticipants = reader.readMap("ldrParticipants", 
MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                resTopicBytes = reader.readByteArray("resTopicBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                sampleClsName = reader.readString("sampleClsName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                skipStore = reader.readBoolean("skipStore");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                updaterBytes = reader.readByteArray("updaterBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                userVer = reader.readString("userVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 62;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 13;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerResponse.java
new file mode 100644
index 0000000..a953fb5
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerResponse.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+
+/**
+ *
+ */
+public class DataStreamerResponse implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] errBytes;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * @param reqId Request ID.
+     * @param errBytes Error bytes.
+     * @param forceLocDep Force local deployment.
+     */
+    public DataStreamerResponse(long reqId, byte[] errBytes, boolean 
forceLocDep) {
+        this.reqId = reqId;
+        this.errBytes = errBytes;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public DataStreamerResponse() {
+        // No-op.
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Error bytes.
+     */
+    public byte[] errorBytes() {
+        return errBytes;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataStreamerResponse.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeBoolean("forceLocDep", forceLocDep))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                forceLocDep = reader.readBoolean("forceLocDep");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 63;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerUpdateJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerUpdateJob.java
new file mode 100644
index 0000000..a64a108
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerUpdateJob.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Job to put entries to cache on affinity node.
+ */
+class DataStreamerUpdateJob implements GridPlainCallable<Object> {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Entries to put. */
+    private final Collection<DataStreamerEntry> col;
+
+    /** {@code True} to ignore deployment ownership. */
+    private final boolean ignoreDepOwnership;
+
+    /** */
+    private final boolean skipStore;
+
+    /** */
+    private final IgniteDataStreamer.Updater updater;
+
+    /**
+     * @param ctx Context.
+     * @param log Log.
+     * @param cacheName Cache name.
+     * @param col Entries to put.
+     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
+     * @param skipStore Skip store flag.
+     * @param updater Updater.
+     */
+    DataStreamerUpdateJob(
+        GridKernalContext ctx,
+        IgniteLogger log,
+        @Nullable String cacheName,
+        Collection<DataStreamerEntry> col,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        IgniteDataStreamer.Updater<?, ?> updater) {
+        this.ctx = ctx;
+        this.log = log;
+
+        assert col != null && !col.isEmpty();
+        assert updater != null;
+
+        this.cacheName = cacheName;
+        this.col = col;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public Object call() throws Exception {
+        if (log.isDebugEnabled())
+            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", 
size=" + col.size() + ']');
+
+//        TODO IGNITE-77: restore adapter usage.
+//        GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
+//
+//        IgniteFuture<?> f = cache.context().preloader().startFuture();
+//
+//        if (!f.isDone())
+//            f.get();
+//
+//        if (ignoreDepOwnership)
+//            cache.context().deploy().ignoreOwnership(true);
+
+        IgniteCacheProxy cache = ctx.cache().jcache(cacheName);
+
+        if (skipStore)
+            cache = (IgniteCacheProxy<?, ?>)cache.withSkipStore();
+
+        if (ignoreDepOwnership)
+            cache.context().deploy().ignoreOwnership(true);
+
+        try {
+            final GridCacheContext cctx = cache.context();
+
+            for (DataStreamerEntry e : col) {
+                e.getKey().finishUnmarshal(cctx.cacheObjectContext(), 
cctx.deploy().globalLoader());
+
+                CacheObject val = e.getValue();
+
+                if (val != null)
+                    val.finishUnmarshal(cctx.cacheObjectContext(), 
cctx.deploy().globalLoader());
+            }
+
+            if (unwrapEntries()) {
+                Collection<Map.Entry> col0 = F.viewReadOnly(col, new 
C1<DataStreamerEntry, Map.Entry>() {
+                    @Override public Map.Entry apply(DataStreamerEntry e) {
+                        return e.toEntry(cctx);
+                    }
+                });
+
+                updater.update(cache, col0);
+            }
+            else
+                updater.update(cache, col);
+
+            return null;
+        }
+        finally {
+            if (ignoreDepOwnership)
+                cache.context().deploy().ignoreOwnership(false);
+
+            if (log.isDebugEnabled())
+                log.debug("Update job finished on node: " + ctx.localNodeId());
+        }
+    }
+
+    /**
+     * @return {@code True} if need to unwrap internal entries.
+     */
+    private boolean unwrapEntries() {
+        return !(updater instanceof DataStreamerCacheUpdaters.InternalUpdater);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
deleted file mode 100644
index 8c66812..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastream;
-
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- *
- */
-public class GridDataLoadRequest implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long reqId;
-
-    /** */
-    private byte[] resTopicBytes;
-
-    /** Cache name. */
-    private String cacheName;
-
-    /** */
-    private byte[] updaterBytes;
-
-    /** Entries to update. */
-    @GridDirectCollection(IgniteDataLoaderEntry.class)
-    private Collection<IgniteDataLoaderEntry> entries;
-
-    /** {@code True} to ignore deployment ownership. */
-    private boolean ignoreDepOwnership;
-
-    /** */
-    private boolean skipStore;
-
-    /** */
-    private DeploymentMode depMode;
-
-    /** */
-    private String sampleClsName;
-
-    /** */
-    private String userVer;
-
-    /** Node class loader participants. */
-    @GridToStringInclude
-    @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
-    private Map<UUID, IgniteUuid> ldrParticipants;
-
-    /** */
-    private IgniteUuid clsLdrId;
-
-    /** */
-    private boolean forceLocDep;
-
-    /**
-     * {@code Externalizable} support.
-     */
-    public GridDataLoadRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param reqId Request ID.
-     * @param resTopicBytes Response topic.
-     * @param cacheName Cache name.
-     * @param updaterBytes Cache updater.
-     * @param entries Entries to put.
-     * @param ignoreDepOwnership Ignore ownership.
-     * @param skipStore Skip store flag.
-     * @param depMode Deployment mode.
-     * @param sampleClsName Sample class name.
-     * @param userVer User version.
-     * @param ldrParticipants Loader participants.
-     * @param clsLdrId Class loader ID.
-     * @param forceLocDep Force local deployment.
-     */
-    public GridDataLoadRequest(long reqId,
-        byte[] resTopicBytes,
-        @Nullable String cacheName,
-        byte[] updaterBytes,
-        Collection<IgniteDataLoaderEntry> entries,
-        boolean ignoreDepOwnership,
-        boolean skipStore,
-        DeploymentMode depMode,
-        String sampleClsName,
-        String userVer,
-        Map<UUID, IgniteUuid> ldrParticipants,
-        IgniteUuid clsLdrId,
-        boolean forceLocDep) {
-        this.reqId = reqId;
-        this.resTopicBytes = resTopicBytes;
-        this.cacheName = cacheName;
-        this.updaterBytes = updaterBytes;
-        this.entries = entries;
-        this.ignoreDepOwnership = ignoreDepOwnership;
-        this.skipStore = skipStore;
-        this.depMode = depMode;
-        this.sampleClsName = sampleClsName;
-        this.userVer = userVer;
-        this.ldrParticipants = ldrParticipants;
-        this.clsLdrId = clsLdrId;
-        this.forceLocDep = forceLocDep;
-    }
-
-    /**
-     * @return Request ID.
-     */
-    public long requestId() {
-        return reqId;
-    }
-
-    /**
-     * @return Response topic.
-     */
-    public byte[] responseTopicBytes() {
-        return resTopicBytes;
-    }
-
-    /**
-     * @return Cache name.
-     */
-    public String cacheName() {
-        return cacheName;
-    }
-
-    /**
-     * @return Updater.
-     */
-    public byte[] updaterBytes() {
-        return updaterBytes;
-    }
-
-    /**
-     * @return Entries to update.
-     */
-    public Collection<IgniteDataLoaderEntry> entries() {
-        return entries;
-    }
-
-    /**
-     * @return {@code True} to ignore ownership.
-     */
-    public boolean ignoreDeploymentOwnership() {
-        return ignoreDepOwnership;
-    }
-
-    /**
-     * @return Skip store flag.
-     */
-    public boolean skipStore() {
-        return skipStore;
-    }
-
-    /**
-     * @return Deployment mode.
-     */
-    public DeploymentMode deploymentMode() {
-        return depMode;
-    }
-
-    /**
-     * @return Sample class name.
-     */
-    public String sampleClassName() {
-        return sampleClsName;
-    }
-
-    /**
-     * @return User version.
-     */
-    public String userVersion() {
-        return userVer;
-    }
-
-    /**
-     * @return Participants.
-     */
-    public Map<UUID, IgniteUuid> participants() {
-        return ldrParticipants;
-    }
-
-    /**
-     * @return Class loader ID.
-     */
-    public IgniteUuid classLoaderId() {
-        return clsLdrId;
-    }
-
-    /**
-     * @return {@code True} to force local deployment.
-     */
-    public boolean forceLocalDeployment() {
-        return forceLocDep;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDataLoadRequest.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeString("cacheName", cacheName))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeByte("depMode", depMode != null ? 
(byte)depMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 3:
-                if (!writer.writeCollection("entries", entries, 
MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeBoolean("forceLocDep", forceLocDep))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeBoolean("ignoreDepOwnership", 
ignoreDepOwnership))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeMap("ldrParticipants", ldrParticipants, 
MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeLong("reqId", reqId))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeString("sampleClsName", sampleClsName))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeBoolean("skipStore", skipStore))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeByteArray("updaterBytes", updaterBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeString("userVer", userVer))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cacheName = reader.readString("cacheName");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                clsLdrId = reader.readIgniteUuid("clsLdrId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                byte depModeOrd;
-
-                depModeOrd = reader.readByte("depMode");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                depMode = DeploymentMode.fromOrdinal(depModeOrd);
-
-                reader.incrementState();
-
-            case 3:
-                entries = reader.readCollection("entries", 
MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                forceLocDep = reader.readBoolean("forceLocDep");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                ldrParticipants = reader.readMap("ldrParticipants", 
MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                reqId = reader.readLong("reqId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 8:
-                resTopicBytes = reader.readByteArray("resTopicBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                sampleClsName = reader.readString("sampleClsName");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                skipStore = reader.readBoolean("skipStore");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                updaterBytes = reader.readByteArray("updaterBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 12:
-                userVer = reader.readString("userVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 62;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 13;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
deleted file mode 100644
index 25ff9ce..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastream;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.nio.*;
-
-/**
- *
- */
-public class GridDataLoadResponse implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long reqId;
-
-    /** */
-    private byte[] errBytes;
-
-    /** */
-    private boolean forceLocDep;
-
-    /**
-     * @param reqId Request ID.
-     * @param errBytes Error bytes.
-     * @param forceLocDep Force local deployment.
-     */
-    public GridDataLoadResponse(long reqId, byte[] errBytes, boolean 
forceLocDep) {
-        this.reqId = reqId;
-        this.errBytes = errBytes;
-        this.forceLocDep = forceLocDep;
-    }
-
-    /**
-     * {@code Externalizable} support.
-     */
-    public GridDataLoadResponse() {
-        // No-op.
-    }
-
-    /**
-     * @return Request ID.
-     */
-    public long requestId() {
-        return reqId;
-    }
-
-    /**
-     * @return Error bytes.
-     */
-    public byte[] errorBytes() {
-        return errBytes;
-    }
-
-    /**
-     * @return {@code True} to force local deployment.
-     */
-    public boolean forceLocalDeployment() {
-        return forceLocDep;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDataLoadResponse.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray("errBytes", errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeBoolean("forceLocDep", forceLocDep))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeLong("reqId", reqId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                errBytes = reader.readByteArray("errBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                forceLocDep = reader.readBoolean("forceLocDep");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                reqId = reader.readLong("reqId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 63;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 3;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataLoaderEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataLoaderEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataLoaderEntry.java
deleted file mode 100644
index 94f908b..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataLoaderEntry.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastream;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- *
- */
-public class IgniteDataLoaderEntry implements Map.Entry<KeyCacheObject, 
CacheObject>, Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    @GridToStringInclude
-    protected KeyCacheObject key;
-
-    /** */
-    @GridToStringInclude
-    protected CacheObject val;
-
-    /**
-     *
-     */
-    public IgniteDataLoaderEntry() {
-        // No-op.
-    }
-
-    /**
-     * @param key Key.
-     * @param val Value.
-     */
-    public IgniteDataLoaderEntry(KeyCacheObject key, CacheObject val) {
-        this.key = key;
-        this.val = val;
-    }
-
-    /** {@inheritDoc} */
-    @Override public KeyCacheObject getKey() {
-        return key;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheObject getValue() {
-        return val;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheObject setValue(CacheObject val) {
-        CacheObject old = this.val;
-
-        this.val = val;
-
-        return old;
-    }
-
-    /**
-     * @param ctx Cache context.
-     * @return Map entry unwrapping internal key and value.
-     */
-    public <K, V> Map.Entry<K, V> toEntry(final GridCacheContext ctx) {
-        return new Map.Entry<K, V>() {
-            @Override public K getKey() {
-                return key.value(ctx.cacheObjectContext(), false);
-            }
-
-            @Override public V setValue(V val) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override public V getValue() {
-                return val != null ? val.<V>value(ctx.cacheObjectContext(), 
false) : null;
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeMessage("key", key))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeMessage("val", val))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                key = reader.readMessage("key");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                val = reader.readMessage("val");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 95;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteDataLoaderEntry.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
deleted file mode 100644
index af3f358..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastream;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Bundled factory for cache updaters.
- */
-public class IgniteDataStreamerCacheUpdaters {
-    /** */
-    private static final IgniteDataStreamer.Updater INDIVIDUAL = new 
Individual();
-
-    /** */
-    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
-
-    /** */
-    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new 
BatchedSorted();
-
-    /**
-     * Updates cache using independent {@link 
org.apache.ignite.cache.GridCache#put(Object, Object, 
org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#remove(Object, 
org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from 
deadlocks but performance
-     * is not the best.
-     *
-     * @return Single updater.
-     */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
-        return INDIVIDUAL;
-    }
-
-    /**
-     * Updates cache using batched methods {@link 
org.apache.ignite.cache.GridCache#putAll(Map, 
org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, 
org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same 
keys are getting
-     * updated concurrently. Performance is generally better than in {@link 
#individual()}.
-     *
-     * @return Batched updater.
-     */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
-        return BATCHED;
-    }
-
-    /**
-     * Updates cache using batched methods {@link 
org.apache.ignite.cache.GridCache#putAll(Map, 
org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, 
org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order 
and if all updates
-     * use the same rule deadlock can not happen. Performance is generally 
better than in {@link #individual()}.
-     *
-     * @return Batched sorted updater.
-     */
-    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, 
V> batchedSorted() {
-        return BATCHED_SORTED;
-    }
-
-    /**
-     * Updates cache.
-     *
-     * @param cache Cache.
-     * @param rmvCol Keys to remove.
-     * @param putMap Entries to put.
-     * @throws IgniteException If failed.
-     */
-    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable 
Set<K> rmvCol,
-        Map<K, V> putMap) {
-        assert rmvCol != null || putMap != null;
-
-        // Here we assume that there are no key duplicates, so the following 
calls are valid.
-        if (rmvCol != null)
-            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
-
-        if (putMap != null)
-            cache.putAll(putMap);
-    }
-
-    /**
-     * Simple cache updater implementation. Updates keys one by one thus is 
not dead lock prone.
-     */
-    private static class Individual<K, V> implements 
IgniteDataStreamer.Updater<K, V>, InternalUpdater {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                V val = entry.getValue();
-
-                if (val == null)
-                    cache.remove(key);
-                else
-                    cache.put(key, val);
-            }
-        }
-    }
-
-    /**
-     * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
-     */
-    private static class Batched<K, V> implements 
IgniteDataStreamer.Updater<K, V>, InternalUpdater {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            Map<K, V> putAll = null;
-            Set<K> rmvAll = null;
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                V val = entry.getValue();
-
-                if (val == null) {
-                    if (rmvAll == null)
-                        rmvAll = new HashSet<>();
-
-                    rmvAll.add(key);
-                }
-                else {
-                    if (putAll == null)
-                        putAll = new HashMap<>();
-
-                    putAll.put(key, val);
-                }
-            }
-
-            updateAll(cache, rmvAll, putAll);
-        }
-    }
-
-    /**
-     * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
-     */
-    private static class BatchedSorted<K, V> implements 
IgniteDataStreamer.Updater<K, V>, InternalUpdater {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            Map<K, V> putAll = null;
-            Set<K> rmvAll = null;
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key instanceof Comparable;
-
-                V val = entry.getValue();
-
-                if (val == null) {
-                    if (rmvAll == null)
-                        rmvAll = new TreeSet<>();
-
-                    rmvAll.add(key);
-                }
-                else {
-                    if (putAll == null)
-                        putAll = new TreeMap<>();
-
-                    putAll.put(key, val);
-                }
-            }
-
-            updateAll(cache, rmvAll, putAll);
-        }
-    }
-
-    /**
-     * Marker interface for updaters which do not need to unwrap cache objects.
-     */
-    public static interface InternalUpdater {
-        // No-op.
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
deleted file mode 100644
index 8ed4271..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastream;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-/**
- * Data streamer future.
- */
-class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Data loader. */
-    @GridToStringExclude
-    private IgniteDataStreamerImpl dataLdr;
-
-    /**
-     * Default constructor for {@link java.io.Externalizable} support.
-     */
-    public IgniteDataStreamerFuture() {
-        // No-op.
-    }
-
-    /**
-     * @param ctx Context.
-     * @param dataLdr Data streamer.
-     */
-    IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl 
dataLdr) {
-        assert dataLdr != null;
-
-        this.dataLdr = dataLdr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean cancel() throws IgniteCheckedException {
-        if (onCancelled()) {
-            dataLdr.closeEx(true);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteDataStreamerFuture.class, this, 
super.toString());
-    }
-}

Reply via email to