http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java deleted file mode 100644 index 7db41e6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.thread.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.GridTopic.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; - -/** - * - */ -public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter { - /** Loaders map (access is not supposed to be highly concurrent). */ - private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>(); - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** Flushing thread. */ - private Thread flusher; - - /** */ - private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>(); - - /** Marshaller. */ - private final Marshaller marsh; - - /** - * @param ctx Kernal context. - */ - public IgniteDataStreamerProcessor(GridKernalContext ctx) { - super(ctx); - - ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - assert msg instanceof GridDataLoadRequest; - - processDataLoadRequest(nodeId, (GridDataLoadRequest)msg); - } - }); - - marsh = ctx.config().getMarshaller(); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { - @Override protected void body() throws InterruptedException { - while (!isCancelled()) { - IgniteDataStreamerImpl<K, V> ldr = flushQ.take(); - - if (!busyLock.enterBusy()) - return; - - try { - if (ldr.isClosed()) - continue; - - ldr.tryFlush(); - - flushQ.offer(ldr); - } - finally { - busyLock.leaveBusy(); - } - } - } - }); - - flusher.start(); - - if (log.isDebugEnabled()) - log.debug("Started data streamer processor."); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (ctx.config().isDaemon()) - return; - - ctx.io().removeMessageListener(TOPIC_DATALOAD); - - busyLock.block(); - - U.interrupt(flusher); - U.join(flusher, log); - - for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) { - if (log.isDebugEnabled()) - log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); - - try { - ldr.closeEx(cancel); - } - catch (IgniteInterruptedCheckedException e) { - U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close data streamer: " + ldr, e); - } - } - - if (log.isDebugEnabled()) - log.debug("Stopped data streamer processor."); - } - - /** - * @param cacheName Cache name ({@code null} for default cache). - * @param compact {@code true} if data streamer should transfer data in compact format. - * @return Data streamer. - */ - public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) { - if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to create data streamer (grid is stopping)."); - - try { - final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact); - - ldrs.add(ldr); - - ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - boolean b = ldrs.remove(ldr); - - assert b : "Loader has not been added to set: " + ldr; - - if (log.isDebugEnabled()) - log.debug("Loader has been completed: " + ldr); - } - }); - - return ldr; - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * @param cacheName Cache name ({@code null} for default cache). - * @return Data streamer. - */ - public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) { - return dataStreamer(cacheName, true); - } - - /** - * @param nodeId Sender ID. - * @param req Request. - */ - private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) { - if (!busyLock.enterBusy()) { - if (log.isDebugEnabled()) - log.debug("Ignoring data load request (node is stopping): " + req); - - return; - } - - try { - if (log.isDebugEnabled()) - log.debug("Processing data load request: " + req); - - Object topic; - - try { - topic = marsh.unmarshal(req.responseTopicBytes(), null); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal topic from request: " + req, e); - - return; - } - - ClassLoader clsLdr; - - if (req.forceLocalDeployment()) - clsLdr = U.gridClassLoader(); - else { - GridDeployment dep = ctx.deploy().getGlobalDeployment( - req.deploymentMode(), - req.sampleClassName(), - req.sampleClassName(), - req.userVersion(), - nodeId, - req.classLoaderId(), - req.participants(), - null); - - if (dep == null) { - sendResponse(nodeId, - topic, - req.requestId(), - new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + - ", req=" + req + ']'), - false); - - return; - } - - clsLdr = dep.classLoader(); - } - - Collection<Map.Entry<K, V>> col; - IgniteDataStreamer.Updater<K, V> updater; - - try { - col = marsh.unmarshal(req.collectionBytes(), clsLdr); - updater = marsh.unmarshal(req.updaterBytes(), clsLdr); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); - - sendResponse(nodeId, topic, req.requestId(), e, false); - - return; - } - - IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx, - log, - req.cacheName(), - col, - req.ignoreDeploymentOwnership(), - req.skipStore(), - updater); - - Exception err = null; - - try { - job.call(); - } - catch (Exception e) { - U.error(log, "Failed to finish update job.", e); - - err = e; - } - - sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment()); - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * @param nodeId Node ID. - * @param resTopic Response topic. - * @param reqId Request ID. - * @param err Error. - * @param forceLocDep Force local deployment. - */ - private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, - boolean forceLocDep) { - byte[] errBytes; - - try { - errBytes = err != null ? marsh.marshal(err) : null; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message.", e); - - return; - } - - GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep); - - try { - ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - if (ctx.discovery().alive(nodeId)) - U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e); - else if (log.isDebugEnabled()) - log.debug("Node has left the grid: " + nodeId); - } - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> ldrsSize: " + ldrs.size()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java deleted file mode 100644 index 1a3db40..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Job to put entries to cache on affinity node. - */ -class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> { - /** */ - private final GridKernalContext ctx; - - /** */ - private final IgniteLogger log; - - /** Cache name. */ - private final String cacheName; - - /** Entries to put. */ - private final Collection<Map.Entry<K, V>> col; - - /** {@code True} to ignore deployment ownership. */ - private final boolean ignoreDepOwnership; - - /** */ - private final boolean skipStore; - - /** */ - private final IgniteDataStreamer.Updater<K, V> updater; - - /** - * @param ctx Context. - * @param log Log. - * @param cacheName Cache name. - * @param col Entries to put. - * @param ignoreDepOwnership {@code True} to ignore deployment ownership. - * @param updater Updater. - */ - IgniteDataStreamerUpdateJob( - GridKernalContext ctx, - IgniteLogger log, - @Nullable String cacheName, - Collection<Map.Entry<K, V>> col, - boolean ignoreDepOwnership, - boolean skipStore, - IgniteDataStreamer.Updater<K, V> updater) { - this.ctx = ctx; - this.log = log; - - assert col != null && !col.isEmpty(); - assert updater != null; - - this.cacheName = cacheName; - this.col = col; - this.ignoreDepOwnership = ignoreDepOwnership; - this.skipStore = skipStore; - this.updater = updater; - } - - /** {@inheritDoc} */ - @Override public Object call() throws Exception { - if (log.isDebugEnabled()) - log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']'); - -// TODO IGNITE-77: restore adapter usage. -// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); -// -// IgniteFuture<?> f = cache.context().preloader().startFuture(); -// -// if (!f.isDone()) -// f.get(); -// -// if (ignoreDepOwnership) -// cache.context().deploy().ignoreOwnership(true); - - IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName); - - if (skipStore) - cache = (IgniteCacheProxy<K, V>)cache.withSkipStore(); - - if (ignoreDepOwnership) - cache.context().deploy().ignoreOwnership(true); - - try { - updater.update(cache, col); - - return null; - } - finally { - if (ignoreDepOwnership) - cache.context().deploy().ignoreOwnership(false); - - if (log.isDebugEnabled()) - log.debug("Update job finished on node: " + ctx.localNodeId()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html deleted file mode 100644 index 1090b86..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Data streamer processor. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java new file mode 100644 index 0000000..d77b52e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + +import java.nio.*; +import java.util.*; + +/** + * + */ +public class GridDataLoadRequest implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private byte[] resTopicBytes; + + /** Cache name. */ + private String cacheName; + + /** */ + private byte[] updaterBytes; + + /** Entries to put. */ + private byte[] colBytes; + + /** {@code True} to ignore deployment ownership. */ + private boolean ignoreDepOwnership; + + /** */ + private boolean skipStore; + + /** */ + private DeploymentMode depMode; + + /** */ + private String sampleClsName; + + /** */ + private String userVer; + + /** Node class loader participants. */ + @GridToStringInclude + @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) + private Map<UUID, IgniteUuid> ldrParticipants; + + /** */ + private IgniteUuid clsLdrId; + + /** */ + private boolean forceLocDep; + + /** + * {@code Externalizable} support. + */ + public GridDataLoadRequest() { + // No-op. + } + + /** + * @param reqId Request ID. + * @param resTopicBytes Response topic. + * @param cacheName Cache name. + * @param updaterBytes Cache updater. + * @param colBytes Collection bytes. + * @param ignoreDepOwnership Ignore ownership. + * @param skipStore Skip store flag. + * @param depMode Deployment mode. + * @param sampleClsName Sample class name. + * @param userVer User version. + * @param ldrParticipants Loader participants. + * @param clsLdrId Class loader ID. + * @param forceLocDep Force local deployment. + */ + public GridDataLoadRequest(long reqId, + byte[] resTopicBytes, + @Nullable String cacheName, + byte[] updaterBytes, + byte[] colBytes, + boolean ignoreDepOwnership, + boolean skipStore, + DeploymentMode depMode, + String sampleClsName, + String userVer, + Map<UUID, IgniteUuid> ldrParticipants, + IgniteUuid clsLdrId, + boolean forceLocDep) { + this.reqId = reqId; + this.resTopicBytes = resTopicBytes; + this.cacheName = cacheName; + this.updaterBytes = updaterBytes; + this.colBytes = colBytes; + this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; + this.depMode = depMode; + this.sampleClsName = sampleClsName; + this.userVer = userVer; + this.ldrParticipants = ldrParticipants; + this.clsLdrId = clsLdrId; + this.forceLocDep = forceLocDep; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Response topic. + */ + public byte[] responseTopicBytes() { + return resTopicBytes; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** + * @return Updater. + */ + public byte[] updaterBytes() { + return updaterBytes; + } + + /** + * @return Collection bytes. + */ + public byte[] collectionBytes() { + return colBytes; + } + + /** + * @return {@code True} to ignore ownership. + */ + public boolean ignoreDeploymentOwnership() { + return ignoreDepOwnership; + } + + /** + * @return Skip store flag. + */ + public boolean skipStore() { + return skipStore; + } + + /** + * @return Deployment mode. + */ + public DeploymentMode deploymentMode() { + return depMode; + } + + /** + * @return Sample class name. + */ + public String sampleClassName() { + return sampleClsName; + } + + /** + * @return User version. + */ + public String userVersion() { + return userVer; + } + + /** + * @return Participants. + */ + public Map<UUID, IgniteUuid> participants() { + return ldrParticipants; + } + + /** + * @return Class loader ID. + */ + public IgniteUuid classLoaderId() { + return clsLdrId; + } + + /** + * @return {@code True} to force local deployment. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDataLoadRequest.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeString("cacheName", cacheName)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeIgniteUuid("clsLdrId", clsLdrId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeByteArray("colBytes", colBytes)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeBoolean("forceLocDep", forceLocDep)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeString("sampleClsName", sampleClsName)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeBoolean("skipStore", skipStore)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeByteArray("updaterBytes", updaterBytes)) + return false; + + writer.incrementState(); + + case 12: + if (!writer.writeString("userVer", userVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cacheName = reader.readString("cacheName"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + clsLdrId = reader.readIgniteUuid("clsLdrId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + colBytes = reader.readByteArray("colBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + byte depModeOrd; + + depModeOrd = reader.readByte("depMode"); + + if (!reader.isLastRead()) + return false; + + depMode = DeploymentMode.fromOrdinal(depModeOrd); + + reader.incrementState(); + + case 4: + forceLocDep = reader.readBoolean("forceLocDep"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + resTopicBytes = reader.readByteArray("resTopicBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + sampleClsName = reader.readString("sampleClsName"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + skipStore = reader.readBoolean("skipStore"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + updaterBytes = reader.readByteArray("updaterBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: + userVer = reader.readString("userVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 62; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 13; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java new file mode 100644 index 0000000..25ff9ce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; + +/** + * + */ +public class GridDataLoadResponse implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private byte[] errBytes; + + /** */ + private boolean forceLocDep; + + /** + * @param reqId Request ID. + * @param errBytes Error bytes. + * @param forceLocDep Force local deployment. + */ + public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) { + this.reqId = reqId; + this.errBytes = errBytes; + this.forceLocDep = forceLocDep; + } + + /** + * {@code Externalizable} support. + */ + public GridDataLoadResponse() { + // No-op. + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Error bytes. + */ + public byte[] errorBytes() { + return errBytes; + } + + /** + * @return {@code True} to force local deployment. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDataLoadResponse.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeBoolean("forceLocDep", forceLocDep)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + forceLocDep = reader.readBoolean("forceLocDep"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 63; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java new file mode 100644 index 0000000..629c7b1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Bundled factory for cache updaters. + */ +public class IgniteDataStreamerCacheUpdaters { + /** */ + private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual(); + + /** */ + private static final IgniteDataStreamer.Updater BATCHED = new Batched(); + + /** */ + private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted(); + + /** + * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance + * is not the best. + * + * @return Single updater. + */ + public static <K, V> IgniteDataStreamer.Updater<K, V> individual() { + return INDIVIDUAL; + } + + /** + * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting + * updated concurrently. Performance is generally better than in {@link #individual()}. + * + * @return Batched updater. + */ + public static <K, V> IgniteDataStreamer.Updater<K, V> batched() { + return BATCHED; + } + + /** + * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates + * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}. + * + * @return Batched sorted updater. + */ + public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() { + return BATCHED_SORTED; + } + + /** + * Updates cache. + * + * @param cache Cache. + * @param rmvCol Keys to remove. + * @param putMap Entries to put. + * @throws IgniteException If failed. + */ + protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol, + Map<K, V> putMap) { + assert rmvCol != null || putMap != null; + + // Here we assume that there are no key duplicates, so the following calls are valid. + if (rmvCol != null) + ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol); + + if (putMap != null) + cache.putAll(putMap); + } + + /** + * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone. + */ + private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + assert cache != null; + assert !F.isEmpty(entries); + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key != null; + + V val = entry.getValue(); + + if (val == null) + cache.remove(key); + else + cache.put(key, val); + } + } + } + + /** + * Batched updater. Updates cache using batch operations thus is dead lock prone. + */ + private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + assert cache != null; + assert !F.isEmpty(entries); + + Map<K, V> putAll = null; + Set<K> rmvAll = null; + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key != null; + + V val = entry.getValue(); + + if (val == null) { + if (rmvAll == null) + rmvAll = new HashSet<>(); + + rmvAll.add(key); + } + else { + if (putAll == null) + putAll = new HashMap<>(); + + putAll.put(key, val); + } + } + + updateAll(cache, rmvAll, putAll); + } + } + + /** + * Batched updater. Updates cache using batch operations thus is dead lock prone. + */ + private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + assert cache != null; + assert !F.isEmpty(entries); + + Map<K, V> putAll = null; + Set<K> rmvAll = null; + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key instanceof Comparable; + + V val = entry.getValue(); + + if (val == null) { + if (rmvAll == null) + rmvAll = new TreeSet<>(); + + rmvAll.add(key); + } + else { + if (putAll == null) + putAll = new TreeMap<>(); + + putAll.put(key, val); + } + } + + updateAll(cache, rmvAll, putAll); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java new file mode 100644 index 0000000..b6aa15c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Data streamer future. + */ +class IgniteDataStreamerFuture extends GridFutureAdapter<Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** Data streamer. */ + @GridToStringExclude + private IgniteDataStreamerImpl dataLdr; + + /** + * Default constructor for {@link Externalizable} support. + */ + public IgniteDataStreamerFuture() { + // No-op. + } + + /** + * @param ctx Context. + * @param dataLdr Data streamer. + */ + IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) { + super(ctx); + + assert dataLdr != null; + + this.dataLdr = dataLdr; + } + + /** {@inheritDoc} */ + @Override public boolean cancel() throws IgniteCheckedException { + checkValid(); + + if (onCancelled()) { + dataLdr.closeEx(true); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteDataStreamerFuture.class, this, super.toString()); + } +}