http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessor.java new file mode 100644 index 0000000..ee12f7a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessor.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.thread.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * + */ +public class DataStreamerProcessor<K, V> extends GridProcessorAdapter { + /** Loaders map (access is not supposed to be highly concurrent). */ + private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>(); + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Flushing thread. */ + private Thread flusher; + + /** */ + private final DelayQueue<DataStreamerImpl<K, V>> flushQ = new DelayQueue<>(); + + /** Marshaller. */ + private final Marshaller marsh; + + /** + * @param ctx Kernal context. + */ + public DataStreamerProcessor(GridKernalContext ctx) { + super(ctx); + + ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof DataStreamerRequest; + + processRequest(nodeId, (DataStreamerRequest)msg); + } + }); + + marsh = ctx.config().getMarshaller(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { + @Override protected void body() throws InterruptedException { + while (!isCancelled()) { + DataStreamerImpl<K, V> ldr = flushQ.take(); + + if (!busyLock.enterBusy()) + return; + + try { + if (ldr.isClosed()) + continue; + + ldr.tryFlush(); + + flushQ.offer(ldr); + } + finally { + busyLock.leaveBusy(); + } + } + } + }); + + flusher.start(); + + if (log.isDebugEnabled()) + log.debug("Started data streamer processor."); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + if (ctx.config().isDaemon()) + return; + + ctx.io().removeMessageListener(TOPIC_DATASTREAM); + + busyLock.block(); + + U.interrupt(flusher); + U.join(flusher, log); + + for (DataStreamerImpl<?, ?> ldr : ldrs) { + if (log.isDebugEnabled()) + log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); + + try { + ldr.closeEx(cancel); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close data streamer: " + ldr, e); + } + } + + if (log.isDebugEnabled()) + log.debug("Stopped data streamer processor."); + } + + /** + * @param cacheName Cache name ({@code null} for default cache). + * @return Data loader. + */ + public DataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to create data streamer (grid is stopping)."); + + try { + final DataStreamerImpl<K, V> ldr = new DataStreamerImpl<>(ctx, cacheName, flushQ); + + ldrs.add(ldr); + + ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + boolean b = ldrs.remove(ldr); + + assert b : "Loader has not been added to set: " + ldr; + + if (log.isDebugEnabled()) + log.debug("Loader has been completed: " + ldr); + } + }); + + return ldr; + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param nodeId Sender ID. + * @param req Request. + */ + private void processRequest(UUID nodeId, DataStreamerRequest req) { + if (!busyLock.enterBusy()) { + if (log.isDebugEnabled()) + log.debug("Ignoring data load request (node is stopping): " + req); + + return; + } + + try { + if (log.isDebugEnabled()) + log.debug("Processing data load request: " + req); + + Object topic; + + try { + topic = marsh.unmarshal(req.responseTopicBytes(), null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal topic from request: " + req, e); + + return; + } + + ClassLoader clsLdr; + + if (req.forceLocalDeployment()) + clsLdr = U.gridClassLoader(); + else { + GridDeployment dep = ctx.deploy().getGlobalDeployment( + req.deploymentMode(), + req.sampleClassName(), + req.sampleClassName(), + req.userVersion(), + nodeId, + req.classLoaderId(), + req.participants(), + null); + + if (dep == null) { + sendResponse(nodeId, + topic, + req.requestId(), + new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + + ", req=" + req + ']'), + false); + + return; + } + + clsLdr = dep.classLoader(); + } + + IgniteDataStreamer.Updater<K, V> updater; + + try { + updater = marsh.unmarshal(req.updaterBytes(), clsLdr); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); + + sendResponse(nodeId, topic, req.requestId(), e, false); + + return; + } + + Collection<DataStreamerEntry> col = req.entries(); + + DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx, + log, + req.cacheName(), + col, + req.ignoreDeploymentOwnership(), + req.skipStore(), + updater); + + Exception err = null; + + try { + job.call(); + } + catch (Exception e) { + U.error(log, "Failed to finish update job.", e); + + err = e; + } + + sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment()); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param nodeId Node ID. + * @param resTopic Response topic. + * @param reqId Request ID. + * @param err Error. + * @param forceLocDep Force local deployment. + */ + private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, + boolean forceLocDep) { + byte[] errBytes; + + try { + errBytes = err != null ? marsh.marshal(err) : null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal message.", e); + + return; + } + + DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep); + + try { + ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + if (ctx.discovery().alive(nodeId)) + U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e); + else if (log.isDebugEnabled()) + log.debug("Node has left the grid: " + nodeId); + } + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> ldrsSize: " + ldrs.size()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerRequest.java new file mode 100644 index 0000000..1f85ee6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerRequest.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + +import java.nio.*; +import java.util.*; + +/** + * + */ +public class DataStreamerRequest implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private byte[] resTopicBytes; + + /** Cache name. */ + private String cacheName; + + /** */ + private byte[] updaterBytes; + + /** Entries to update. */ + @GridDirectCollection(DataStreamerEntry.class) + private Collection<DataStreamerEntry> entries; + + /** {@code True} to ignore deployment ownership. */ + private boolean ignoreDepOwnership; + + /** */ + private boolean skipStore; + + /** */ + private DeploymentMode depMode; + + /** */ + private String sampleClsName; + + /** */ + private String userVer; + + /** Node class loader participants. */ + @GridToStringInclude + @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) + private Map<UUID, IgniteUuid> ldrParticipants; + + /** */ + private IgniteUuid clsLdrId; + + /** */ + private boolean forceLocDep; + + /** + * {@code Externalizable} support. + */ + public DataStreamerRequest() { + // No-op. + } + + /** + * @param reqId Request ID. + * @param resTopicBytes Response topic. + * @param cacheName Cache name. + * @param updaterBytes Cache updater. + * @param entries Entries to put. + * @param ignoreDepOwnership Ignore ownership. + * @param skipStore Skip store flag. + * @param depMode Deployment mode. + * @param sampleClsName Sample class name. + * @param userVer User version. + * @param ldrParticipants Loader participants. + * @param clsLdrId Class loader ID. + * @param forceLocDep Force local deployment. + */ + public DataStreamerRequest(long reqId, + byte[] resTopicBytes, + @Nullable String cacheName, + byte[] updaterBytes, + Collection<DataStreamerEntry> entries, + boolean ignoreDepOwnership, + boolean skipStore, + DeploymentMode depMode, + String sampleClsName, + String userVer, + Map<UUID, IgniteUuid> ldrParticipants, + IgniteUuid clsLdrId, + boolean forceLocDep) { + this.reqId = reqId; + this.resTopicBytes = resTopicBytes; + this.cacheName = cacheName; + this.updaterBytes = updaterBytes; + this.entries = entries; + this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; + this.depMode = depMode; + this.sampleClsName = sampleClsName; + this.userVer = userVer; + this.ldrParticipants = ldrParticipants; + this.clsLdrId = clsLdrId; + this.forceLocDep = forceLocDep; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Response topic. + */ + public byte[] responseTopicBytes() { + return resTopicBytes; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** + * @return Updater. + */ + public byte[] updaterBytes() { + return updaterBytes; + } + + /** + * @return Entries to update. + */ + public Collection<DataStreamerEntry> entries() { + return entries; + } + + /** + * @return {@code True} to ignore ownership. + */ + public boolean ignoreDeploymentOwnership() { + return ignoreDepOwnership; + } + + /** + * @return Skip store flag. + */ + public boolean skipStore() { + return skipStore; + } + + /** + * @return Deployment mode. + */ + public DeploymentMode deploymentMode() { + return depMode; + } + + /** + * @return Sample class name. + */ + public String sampleClassName() { + return sampleClsName; + } + + /** + * @return User version. + */ + public String userVersion() { + return userVer; + } + + /** + * @return Participants. + */ + public Map<UUID, IgniteUuid> participants() { + return ldrParticipants; + } + + /** + * @return Class loader ID. + */ + public IgniteUuid classLoaderId() { + return clsLdrId; + } + + /** + * @return {@code True} to force local deployment. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStreamerRequest.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeString("cacheName", cacheName)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeIgniteUuid("clsLdrId", clsLdrId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeCollection("entries", entries, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeBoolean("forceLocDep", forceLocDep)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeString("sampleClsName", sampleClsName)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeBoolean("skipStore", skipStore)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeByteArray("updaterBytes", updaterBytes)) + return false; + + writer.incrementState(); + + case 12: + if (!writer.writeString("userVer", userVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cacheName = reader.readString("cacheName"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + clsLdrId = reader.readIgniteUuid("clsLdrId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + byte depModeOrd; + + depModeOrd = reader.readByte("depMode"); + + if (!reader.isLastRead()) + return false; + + depMode = DeploymentMode.fromOrdinal(depModeOrd); + + reader.incrementState(); + + case 3: + entries = reader.readCollection("entries", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + forceLocDep = reader.readBoolean("forceLocDep"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + resTopicBytes = reader.readByteArray("resTopicBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + sampleClsName = reader.readString("sampleClsName"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + skipStore = reader.readBoolean("skipStore"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + updaterBytes = reader.readByteArray("updaterBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: + userVer = reader.readString("userVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 62; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 13; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerResponse.java new file mode 100644 index 0000000..a953fb5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerResponse.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; + +/** + * + */ +public class DataStreamerResponse implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private byte[] errBytes; + + /** */ + private boolean forceLocDep; + + /** + * @param reqId Request ID. + * @param errBytes Error bytes. + * @param forceLocDep Force local deployment. + */ + public DataStreamerResponse(long reqId, byte[] errBytes, boolean forceLocDep) { + this.reqId = reqId; + this.errBytes = errBytes; + this.forceLocDep = forceLocDep; + } + + /** + * {@code Externalizable} support. + */ + public DataStreamerResponse() { + // No-op. + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Error bytes. + */ + public byte[] errorBytes() { + return errBytes; + } + + /** + * @return {@code True} to force local deployment. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStreamerResponse.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeBoolean("forceLocDep", forceLocDep)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + forceLocDep = reader.readBoolean("forceLocDep"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 63; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerUpdateJob.java new file mode 100644 index 0000000..a64a108 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerUpdateJob.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Job to put entries to cache on affinity node. + */ +class DataStreamerUpdateJob implements GridPlainCallable<Object> { + /** */ + private final GridKernalContext ctx; + + /** */ + private final IgniteLogger log; + + /** Cache name. */ + private final String cacheName; + + /** Entries to put. */ + private final Collection<DataStreamerEntry> col; + + /** {@code True} to ignore deployment ownership. */ + private final boolean ignoreDepOwnership; + + /** */ + private final boolean skipStore; + + /** */ + private final IgniteDataStreamer.Updater updater; + + /** + * @param ctx Context. + * @param log Log. + * @param cacheName Cache name. + * @param col Entries to put. + * @param ignoreDepOwnership {@code True} to ignore deployment ownership. + * @param skipStore Skip store flag. + * @param updater Updater. + */ + DataStreamerUpdateJob( + GridKernalContext ctx, + IgniteLogger log, + @Nullable String cacheName, + Collection<DataStreamerEntry> col, + boolean ignoreDepOwnership, + boolean skipStore, + IgniteDataStreamer.Updater<?, ?> updater) { + this.ctx = ctx; + this.log = log; + + assert col != null && !col.isEmpty(); + assert updater != null; + + this.cacheName = cacheName; + this.col = col; + this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; + this.updater = updater; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Object call() throws Exception { + if (log.isDebugEnabled()) + log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']'); + +// TODO IGNITE-77: restore adapter usage. +// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); +// +// IgniteFuture<?> f = cache.context().preloader().startFuture(); +// +// if (!f.isDone()) +// f.get(); +// +// if (ignoreDepOwnership) +// cache.context().deploy().ignoreOwnership(true); + + IgniteCacheProxy cache = ctx.cache().jcache(cacheName); + + if (skipStore) + cache = (IgniteCacheProxy<?, ?>)cache.withSkipStore(); + + if (ignoreDepOwnership) + cache.context().deploy().ignoreOwnership(true); + + try { + final GridCacheContext cctx = cache.context(); + + for (DataStreamerEntry e : col) { + e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); + + CacheObject val = e.getValue(); + + if (val != null) + val.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); + } + + if (unwrapEntries()) { + Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() { + @Override public Map.Entry apply(DataStreamerEntry e) { + return e.toEntry(cctx); + } + }); + + updater.update(cache, col0); + } + else + updater.update(cache, col); + + return null; + } + finally { + if (ignoreDepOwnership) + cache.context().deploy().ignoreOwnership(false); + + if (log.isDebugEnabled()) + log.debug("Update job finished on node: " + ctx.localNodeId()); + } + } + + /** + * @return {@code True} if need to unwrap internal entries. + */ + private boolean unwrapEntries() { + return !(updater instanceof DataStreamerCacheUpdaters.InternalUpdater); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java deleted file mode 100644 index 8c66812..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java +++ /dev/null @@ -1,451 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.nio.*; -import java.util.*; - -/** - * - */ -public class GridDataLoadRequest implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long reqId; - - /** */ - private byte[] resTopicBytes; - - /** Cache name. */ - private String cacheName; - - /** */ - private byte[] updaterBytes; - - /** Entries to update. */ - @GridDirectCollection(IgniteDataLoaderEntry.class) - private Collection<IgniteDataLoaderEntry> entries; - - /** {@code True} to ignore deployment ownership. */ - private boolean ignoreDepOwnership; - - /** */ - private boolean skipStore; - - /** */ - private DeploymentMode depMode; - - /** */ - private String sampleClsName; - - /** */ - private String userVer; - - /** Node class loader participants. */ - @GridToStringInclude - @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) - private Map<UUID, IgniteUuid> ldrParticipants; - - /** */ - private IgniteUuid clsLdrId; - - /** */ - private boolean forceLocDep; - - /** - * {@code Externalizable} support. - */ - public GridDataLoadRequest() { - // No-op. - } - - /** - * @param reqId Request ID. - * @param resTopicBytes Response topic. - * @param cacheName Cache name. - * @param updaterBytes Cache updater. - * @param entries Entries to put. - * @param ignoreDepOwnership Ignore ownership. - * @param skipStore Skip store flag. - * @param depMode Deployment mode. - * @param sampleClsName Sample class name. - * @param userVer User version. - * @param ldrParticipants Loader participants. - * @param clsLdrId Class loader ID. - * @param forceLocDep Force local deployment. - */ - public GridDataLoadRequest(long reqId, - byte[] resTopicBytes, - @Nullable String cacheName, - byte[] updaterBytes, - Collection<IgniteDataLoaderEntry> entries, - boolean ignoreDepOwnership, - boolean skipStore, - DeploymentMode depMode, - String sampleClsName, - String userVer, - Map<UUID, IgniteUuid> ldrParticipants, - IgniteUuid clsLdrId, - boolean forceLocDep) { - this.reqId = reqId; - this.resTopicBytes = resTopicBytes; - this.cacheName = cacheName; - this.updaterBytes = updaterBytes; - this.entries = entries; - this.ignoreDepOwnership = ignoreDepOwnership; - this.skipStore = skipStore; - this.depMode = depMode; - this.sampleClsName = sampleClsName; - this.userVer = userVer; - this.ldrParticipants = ldrParticipants; - this.clsLdrId = clsLdrId; - this.forceLocDep = forceLocDep; - } - - /** - * @return Request ID. - */ - public long requestId() { - return reqId; - } - - /** - * @return Response topic. - */ - public byte[] responseTopicBytes() { - return resTopicBytes; - } - - /** - * @return Cache name. - */ - public String cacheName() { - return cacheName; - } - - /** - * @return Updater. - */ - public byte[] updaterBytes() { - return updaterBytes; - } - - /** - * @return Entries to update. - */ - public Collection<IgniteDataLoaderEntry> entries() { - return entries; - } - - /** - * @return {@code True} to ignore ownership. - */ - public boolean ignoreDeploymentOwnership() { - return ignoreDepOwnership; - } - - /** - * @return Skip store flag. - */ - public boolean skipStore() { - return skipStore; - } - - /** - * @return Deployment mode. - */ - public DeploymentMode deploymentMode() { - return depMode; - } - - /** - * @return Sample class name. - */ - public String sampleClassName() { - return sampleClsName; - } - - /** - * @return User version. - */ - public String userVersion() { - return userVer; - } - - /** - * @return Participants. - */ - public Map<UUID, IgniteUuid> participants() { - return ldrParticipants; - } - - /** - * @return Class loader ID. - */ - public IgniteUuid classLoaderId() { - return clsLdrId; - } - - /** - * @return {@code True} to force local deployment. - */ - public boolean forceLocalDeployment() { - return forceLocDep; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDataLoadRequest.class, this); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeString("cacheName", cacheName)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeIgniteUuid("clsLdrId", clsLdrId)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 3: - if (!writer.writeCollection("entries", entries, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeBoolean("forceLocDep", forceLocDep)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeLong("reqId", reqId)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeString("sampleClsName", sampleClsName)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeBoolean("skipStore", skipStore)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeByteArray("updaterBytes", updaterBytes)) - return false; - - writer.incrementState(); - - case 12: - if (!writer.writeString("userVer", userVer)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - cacheName = reader.readString("cacheName"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - clsLdrId = reader.readIgniteUuid("clsLdrId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - byte depModeOrd; - - depModeOrd = reader.readByte("depMode"); - - if (!reader.isLastRead()) - return false; - - depMode = DeploymentMode.fromOrdinal(depModeOrd); - - reader.incrementState(); - - case 3: - entries = reader.readCollection("entries", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - forceLocDep = reader.readBoolean("forceLocDep"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - reqId = reader.readLong("reqId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - resTopicBytes = reader.readByteArray("resTopicBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - sampleClsName = reader.readString("sampleClsName"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - skipStore = reader.readBoolean("skipStore"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - updaterBytes = reader.readByteArray("updaterBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: - userVer = reader.readString("userVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 62; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 13; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java deleted file mode 100644 index 25ff9ce..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.nio.*; - -/** - * - */ -public class GridDataLoadResponse implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long reqId; - - /** */ - private byte[] errBytes; - - /** */ - private boolean forceLocDep; - - /** - * @param reqId Request ID. - * @param errBytes Error bytes. - * @param forceLocDep Force local deployment. - */ - public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) { - this.reqId = reqId; - this.errBytes = errBytes; - this.forceLocDep = forceLocDep; - } - - /** - * {@code Externalizable} support. - */ - public GridDataLoadResponse() { - // No-op. - } - - /** - * @return Request ID. - */ - public long requestId() { - return reqId; - } - - /** - * @return Error bytes. - */ - public byte[] errorBytes() { - return errBytes; - } - - /** - * @return {@code True} to force local deployment. - */ - public boolean forceLocalDeployment() { - return forceLocDep; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDataLoadResponse.class, this); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeBoolean("forceLocDep", forceLocDep)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeLong("reqId", reqId)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - errBytes = reader.readByteArray("errBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - forceLocDep = reader.readBoolean("forceLocDep"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - reqId = reader.readLong("reqId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 63; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 3; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataLoaderEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataLoaderEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataLoaderEntry.java deleted file mode 100644 index 94f908b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataLoaderEntry.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.nio.*; -import java.util.*; - -/** - * - */ -public class IgniteDataLoaderEntry implements Map.Entry<KeyCacheObject, CacheObject>, Message { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @GridToStringInclude - protected KeyCacheObject key; - - /** */ - @GridToStringInclude - protected CacheObject val; - - /** - * - */ - public IgniteDataLoaderEntry() { - // No-op. - } - - /** - * @param key Key. - * @param val Value. - */ - public IgniteDataLoaderEntry(KeyCacheObject key, CacheObject val) { - this.key = key; - this.val = val; - } - - /** {@inheritDoc} */ - @Override public KeyCacheObject getKey() { - return key; - } - - /** {@inheritDoc} */ - @Override public CacheObject getValue() { - return val; - } - - /** {@inheritDoc} */ - @Override public CacheObject setValue(CacheObject val) { - CacheObject old = this.val; - - this.val = val; - - return old; - } - - /** - * @param ctx Cache context. - * @return Map entry unwrapping internal key and value. - */ - public <K, V> Map.Entry<K, V> toEntry(final GridCacheContext ctx) { - return new Map.Entry<K, V>() { - @Override public K getKey() { - return key.value(ctx.cacheObjectContext(), false); - } - - @Override public V setValue(V val) { - throw new UnsupportedOperationException(); - } - - @Override public V getValue() { - return val != null ? val.<V>value(ctx.cacheObjectContext(), false) : null; - } - }; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeMessage("key", key)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeMessage("val", val)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - key = reader.readMessage("key"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - val = reader.readMessage("val"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 95; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteDataLoaderEntry.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java deleted file mode 100644 index af3f358..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Bundled factory for cache updaters. - */ -public class IgniteDataStreamerCacheUpdaters { - /** */ - private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual(); - - /** */ - private static final IgniteDataStreamer.Updater BATCHED = new Batched(); - - /** */ - private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted(); - - /** - * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance - * is not the best. - * - * @return Single updater. - */ - public static <K, V> IgniteDataStreamer.Updater<K, V> individual() { - return INDIVIDUAL; - } - - /** - * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting - * updated concurrently. Performance is generally better than in {@link #individual()}. - * - * @return Batched updater. - */ - public static <K, V> IgniteDataStreamer.Updater<K, V> batched() { - return BATCHED; - } - - /** - * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates - * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}. - * - * @return Batched sorted updater. - */ - public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() { - return BATCHED_SORTED; - } - - /** - * Updates cache. - * - * @param cache Cache. - * @param rmvCol Keys to remove. - * @param putMap Entries to put. - * @throws IgniteException If failed. - */ - protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol, - Map<K, V> putMap) { - assert rmvCol != null || putMap != null; - - // Here we assume that there are no key duplicates, so the following calls are valid. - if (rmvCol != null) - ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol); - - if (putMap != null) - cache.putAll(putMap); - } - - /** - * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone. - */ - private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V>, InternalUpdater { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { - assert cache != null; - assert !F.isEmpty(entries); - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key != null; - - V val = entry.getValue(); - - if (val == null) - cache.remove(key); - else - cache.put(key, val); - } - } - } - - /** - * Batched updater. Updates cache using batch operations thus is dead lock prone. - */ - private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V>, InternalUpdater { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { - assert cache != null; - assert !F.isEmpty(entries); - - Map<K, V> putAll = null; - Set<K> rmvAll = null; - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key != null; - - V val = entry.getValue(); - - if (val == null) { - if (rmvAll == null) - rmvAll = new HashSet<>(); - - rmvAll.add(key); - } - else { - if (putAll == null) - putAll = new HashMap<>(); - - putAll.put(key, val); - } - } - - updateAll(cache, rmvAll, putAll); - } - } - - /** - * Batched updater. Updates cache using batch operations thus is dead lock prone. - */ - private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V>, InternalUpdater { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { - assert cache != null; - assert !F.isEmpty(entries); - - Map<K, V> putAll = null; - Set<K> rmvAll = null; - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key instanceof Comparable; - - V val = entry.getValue(); - - if (val == null) { - if (rmvAll == null) - rmvAll = new TreeSet<>(); - - rmvAll.add(key); - } - else { - if (putAll == null) - putAll = new TreeMap<>(); - - putAll.put(key, val); - } - } - - updateAll(cache, rmvAll, putAll); - } - } - - /** - * Marker interface for updaters which do not need to unwrap cache objects. - */ - public static interface InternalUpdater { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java deleted file mode 100644 index 8ed4271..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Data streamer future. - */ -class IgniteDataStreamerFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 0L; - - /** Data loader. */ - @GridToStringExclude - private IgniteDataStreamerImpl dataLdr; - - /** - * Default constructor for {@link java.io.Externalizable} support. - */ - public IgniteDataStreamerFuture() { - // No-op. - } - - /** - * @param ctx Context. - * @param dataLdr Data streamer. - */ - IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) { - assert dataLdr != null; - - this.dataLdr = dataLdr; - } - - /** {@inheritDoc} */ - @Override public boolean cancel() throws IgniteCheckedException { - if (onCancelled()) { - dataLdr.closeEx(true); - - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteDataStreamerFuture.class, this, super.toString()); - } -}