http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessor.java new file mode 100644 index 0000000..c7abd3a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessor.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.thread.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * + */ +public class DataStreamerProcessor<K, V> extends GridProcessorAdapter { + /** Loaders map (access is not supposed to be highly concurrent). */ + private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>(); + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Flushing thread. */ + private Thread flusher; + + /** */ + private final DelayQueue<DataStreamerImpl<K, V>> flushQ = new DelayQueue<>(); + + /** Marshaller. */ + private final Marshaller marsh; + + /** + * @param ctx Kernal context. + */ + public DataStreamerProcessor(GridKernalContext ctx) { + super(ctx); + + ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof DataStreamerRequest; + + processRequest(nodeId, (DataStreamerRequest)msg); + } + }); + + marsh = ctx.config().getMarshaller(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { + @Override protected void body() throws InterruptedException { + while (!isCancelled()) { + DataStreamerImpl<K, V> ldr = flushQ.take(); + + if (!busyLock.enterBusy()) + return; + + try { + if (ldr.isClosed()) + continue; + + ldr.tryFlush(); + + flushQ.offer(ldr); + } + finally { + busyLock.leaveBusy(); + } + } + } + }); + + flusher.start(); + + if (log.isDebugEnabled()) + log.debug("Started data streamer processor."); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + if (ctx.config().isDaemon()) + return; + + ctx.io().removeMessageListener(TOPIC_DATASTREAM); + + busyLock.block(); + + U.interrupt(flusher); + U.join(flusher, log); + + for (DataStreamerImpl<?, ?> ldr : ldrs) { + if (log.isDebugEnabled()) + log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); + + try { + ldr.closeEx(cancel); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close data streamer: " + ldr, e); + } + } + + if (log.isDebugEnabled()) + log.debug("Stopped data streamer processor."); + } + + /** + * @param cacheName Cache name ({@code null} for default cache). + * @return Data loader. + */ + public DataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to create data streamer (grid is stopping)."); + + try { + final DataStreamerImpl<K, V> ldr = new DataStreamerImpl<>(ctx, cacheName, flushQ); + + ldrs.add(ldr); + + ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + boolean b = ldrs.remove(ldr); + + assert b : "Loader has not been added to set: " + ldr; + + if (log.isDebugEnabled()) + log.debug("Loader has been completed: " + ldr); + } + }); + + return ldr; + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param nodeId Sender ID. + * @param req Request. + */ + private void processRequest(UUID nodeId, DataStreamerRequest req) { + if (!busyLock.enterBusy()) { + if (log.isDebugEnabled()) + log.debug("Ignoring data load request (node is stopping): " + req); + + return; + } + + try { + if (log.isDebugEnabled()) + log.debug("Processing data load request: " + req); + + Object topic; + + try { + topic = marsh.unmarshal(req.responseTopicBytes(), null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal topic from request: " + req, e); + + return; + } + + ClassLoader clsLdr; + + if (req.forceLocalDeployment()) + clsLdr = U.gridClassLoader(); + else { + GridDeployment dep = ctx.deploy().getGlobalDeployment( + req.deploymentMode(), + req.sampleClassName(), + req.sampleClassName(), + req.userVersion(), + nodeId, + req.classLoaderId(), + req.participants(), + null); + + if (dep == null) { + sendResponse(nodeId, + topic, + req.requestId(), + new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + + ", req=" + req + ']'), + false); + + return; + } + + clsLdr = dep.classLoader(); + } + + IgniteDataStreamer.Updater<K, V> updater; + + try { + updater = marsh.unmarshal(req.updaterBytes(), clsLdr); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); + + sendResponse(nodeId, topic, req.requestId(), e, false); + + return; + } + + Collection<DataStreamerEntry> col = req.entries(); + + DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx, + log, + req.cacheName(), + col, + req.ignoreDeploymentOwnership(), + req.skipStore(), + updater); + + Exception err = null; + + try { + job.call(); + } + catch (Exception e) { + U.error(log, "Failed to finish update job.", e); + + err = e; + } + + sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment()); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param nodeId Node ID. + * @param resTopic Response topic. + * @param reqId Request ID. + * @param err Error. + * @param forceLocDep Force local deployment. + */ + private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, + boolean forceLocDep) { + byte[] errBytes; + + try { + errBytes = err != null ? marsh.marshal(err) : null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal message.", e); + + return; + } + + DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep); + + try { + ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + if (ctx.discovery().alive(nodeId)) + U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e); + else if (log.isDebugEnabled()) + log.debug("Node has left the grid: " + nodeId); + } + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> ldrsSize: " + ldrs.size()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java new file mode 100644 index 0000000..26f4965 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + +import java.nio.*; +import java.util.*; + +/** + * + */ +public class DataStreamerRequest implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private byte[] resTopicBytes; + + /** Cache name. */ + private String cacheName; + + /** */ + private byte[] updaterBytes; + + /** Entries to update. */ + @GridDirectCollection(DataStreamerEntry.class) + private Collection<DataStreamerEntry> entries; + + /** {@code True} to ignore deployment ownership. */ + private boolean ignoreDepOwnership; + + /** */ + private boolean skipStore; + + /** */ + private DeploymentMode depMode; + + /** */ + private String sampleClsName; + + /** */ + private String userVer; + + /** Node class loader participants. */ + @GridToStringInclude + @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) + private Map<UUID, IgniteUuid> ldrParticipants; + + /** */ + private IgniteUuid clsLdrId; + + /** */ + private boolean forceLocDep; + + /** + * {@code Externalizable} support. + */ + public DataStreamerRequest() { + // No-op. + } + + /** + * @param reqId Request ID. + * @param resTopicBytes Response topic. + * @param cacheName Cache name. + * @param updaterBytes Cache updater. + * @param entries Entries to put. + * @param ignoreDepOwnership Ignore ownership. + * @param skipStore Skip store flag. + * @param depMode Deployment mode. + * @param sampleClsName Sample class name. + * @param userVer User version. + * @param ldrParticipants Loader participants. + * @param clsLdrId Class loader ID. + * @param forceLocDep Force local deployment. + */ + public DataStreamerRequest(long reqId, + byte[] resTopicBytes, + @Nullable String cacheName, + byte[] updaterBytes, + Collection<DataStreamerEntry> entries, + boolean ignoreDepOwnership, + boolean skipStore, + DeploymentMode depMode, + String sampleClsName, + String userVer, + Map<UUID, IgniteUuid> ldrParticipants, + IgniteUuid clsLdrId, + boolean forceLocDep) { + this.reqId = reqId; + this.resTopicBytes = resTopicBytes; + this.cacheName = cacheName; + this.updaterBytes = updaterBytes; + this.entries = entries; + this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; + this.depMode = depMode; + this.sampleClsName = sampleClsName; + this.userVer = userVer; + this.ldrParticipants = ldrParticipants; + this.clsLdrId = clsLdrId; + this.forceLocDep = forceLocDep; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Response topic. + */ + public byte[] responseTopicBytes() { + return resTopicBytes; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** + * @return Updater. + */ + public byte[] updaterBytes() { + return updaterBytes; + } + + /** + * @return Entries to update. + */ + public Collection<DataStreamerEntry> entries() { + return entries; + } + + /** + * @return {@code True} to ignore ownership. + */ + public boolean ignoreDeploymentOwnership() { + return ignoreDepOwnership; + } + + /** + * @return Skip store flag. + */ + public boolean skipStore() { + return skipStore; + } + + /** + * @return Deployment mode. + */ + public DeploymentMode deploymentMode() { + return depMode; + } + + /** + * @return Sample class name. + */ + public String sampleClassName() { + return sampleClsName; + } + + /** + * @return User version. + */ + public String userVersion() { + return userVer; + } + + /** + * @return Participants. + */ + public Map<UUID, IgniteUuid> participants() { + return ldrParticipants; + } + + /** + * @return Class loader ID. + */ + public IgniteUuid classLoaderId() { + return clsLdrId; + } + + /** + * @return {@code True} to force local deployment. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStreamerRequest.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeString("cacheName", cacheName)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeIgniteUuid("clsLdrId", clsLdrId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeCollection("entries", entries, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeBoolean("forceLocDep", forceLocDep)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeString("sampleClsName", sampleClsName)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeBoolean("skipStore", skipStore)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeByteArray("updaterBytes", updaterBytes)) + return false; + + writer.incrementState(); + + case 12: + if (!writer.writeString("userVer", userVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cacheName = reader.readString("cacheName"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + clsLdrId = reader.readIgniteUuid("clsLdrId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + byte depModeOrd; + + depModeOrd = reader.readByte("depMode"); + + if (!reader.isLastRead()) + return false; + + depMode = DeploymentMode.fromOrdinal(depModeOrd); + + reader.incrementState(); + + case 3: + entries = reader.readCollection("entries", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + forceLocDep = reader.readBoolean("forceLocDep"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + resTopicBytes = reader.readByteArray("resTopicBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + sampleClsName = reader.readString("sampleClsName"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + skipStore = reader.readBoolean("skipStore"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + updaterBytes = reader.readByteArray("updaterBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: + userVer = reader.readString("userVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 62; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 13; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java new file mode 100644 index 0000000..8aee0d5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; + +/** + * + */ +public class DataStreamerResponse implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private byte[] errBytes; + + /** */ + private boolean forceLocDep; + + /** + * @param reqId Request ID. + * @param errBytes Error bytes. + * @param forceLocDep Force local deployment. + */ + public DataStreamerResponse(long reqId, byte[] errBytes, boolean forceLocDep) { + this.reqId = reqId; + this.errBytes = errBytes; + this.forceLocDep = forceLocDep; + } + + /** + * {@code Externalizable} support. + */ + public DataStreamerResponse() { + // No-op. + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Error bytes. + */ + public byte[] errorBytes() { + return errBytes; + } + + /** + * @return {@code True} to force local deployment. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStreamerResponse.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeBoolean("forceLocDep", forceLocDep)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + forceLocDep = reader.readBoolean("forceLocDep"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 63; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java new file mode 100644 index 0000000..1faf22d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Job to put entries to cache on affinity node. + */ +class DataStreamerUpdateJob implements GridPlainCallable<Object> { + /** */ + private final GridKernalContext ctx; + + /** */ + private final IgniteLogger log; + + /** Cache name. */ + private final String cacheName; + + /** Entries to put. */ + private final Collection<DataStreamerEntry> col; + + /** {@code True} to ignore deployment ownership. */ + private final boolean ignoreDepOwnership; + + /** */ + private final boolean skipStore; + + /** */ + private final IgniteDataStreamer.Updater updater; + + /** + * @param ctx Context. + * @param log Log. + * @param cacheName Cache name. + * @param col Entries to put. + * @param ignoreDepOwnership {@code True} to ignore deployment ownership. + * @param skipStore Skip store flag. + * @param updater Updater. + */ + DataStreamerUpdateJob( + GridKernalContext ctx, + IgniteLogger log, + @Nullable String cacheName, + Collection<DataStreamerEntry> col, + boolean ignoreDepOwnership, + boolean skipStore, + IgniteDataStreamer.Updater<?, ?> updater) { + this.ctx = ctx; + this.log = log; + + assert col != null && !col.isEmpty(); + assert updater != null; + + this.cacheName = cacheName; + this.col = col; + this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; + this.updater = updater; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Object call() throws Exception { + if (log.isDebugEnabled()) + log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']'); + +// TODO IGNITE-77: restore adapter usage. +// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); +// +// IgniteFuture<?> f = cache.context().preloader().startFuture(); +// +// if (!f.isDone()) +// f.get(); +// +// if (ignoreDepOwnership) +// cache.context().deploy().ignoreOwnership(true); + + IgniteCacheProxy cache = ctx.cache().jcache(cacheName); + + if (skipStore) + cache = (IgniteCacheProxy<?, ?>)cache.withSkipStore(); + + if (ignoreDepOwnership) + cache.context().deploy().ignoreOwnership(true); + + try { + final GridCacheContext cctx = cache.context(); + + for (DataStreamerEntry e : col) { + e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); + + CacheObject val = e.getValue(); + + if (val != null) + val.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); + } + + if (unwrapEntries()) { + Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() { + @Override public Map.Entry apply(DataStreamerEntry e) { + return e.toEntry(cctx); + } + }); + + updater.update(cache, col0); + } + else + updater.update(cache, col); + + return null; + } + finally { + if (ignoreDepOwnership) + cache.context().deploy().ignoreOwnership(false); + + if (log.isDebugEnabled()) + log.debug("Update job finished on node: " + ctx.localNodeId()); + } + } + + /** + * @return {@code True} if need to unwrap internal entries. + */ + private boolean unwrapEntries() { + return !(updater instanceof DataStreamerCacheUpdaters.InternalUpdater); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/package.html new file mode 100644 index 0000000..1090b86 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Data streamer processor. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java index 9ceeb0d..af8b088 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java @@ -23,7 +23,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.dr.*; import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.processors.datastream.*; +import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 378ef6b..6f7cb92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -31,7 +31,7 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.datastream.*; +import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerImplSelfTest.java deleted file mode 100644 index 9f6356b..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerImplSelfTest.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; - -/** - * Tests for {@code IgniteDataStreamerImpl}. - */ -public class DataStreamerImplSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of keys to load via data streamer. */ - private static final int KEYS_COUNT = 1000; - - /** Started grid counter. */ - private static int cnt; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - // Forth node goes without cache. - if (cnt < 4) - cfg.setCacheConfiguration(cacheConfiguration()); - - cnt++; - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testNullPointerExceptionUponDataStreamerClosing() throws Exception { - try { - startGrids(5); - - final CyclicBarrier barrier = new CyclicBarrier(2); - - multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - U.awaitQuiet(barrier); - - G.stopAll(true); - - return null; - } - }, 1); - - Ignite g4 = grid(4); - - IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null); - - dataLdr.perNodeBufferSize(32); - - for (int i = 0; i < 100000; i += 2) { - dataLdr.addData(i, i); - dataLdr.removeData(i + 1); - } - - U.awaitQuiet(barrier); - - info("Closing data streamer."); - - try { - dataLdr.close(true); - } - catch (IllegalStateException ignore) { - // This is ok to ignore this exception as test is racy by it's nature - - // grid is stopping in different thread. - } - } - finally { - G.stopAll(true); - } - } - - /** - * Data streamer should correctly load entries from HashMap in case of grids with more than one node - * and with GridOptimizedMarshaller that requires serializable. - * - * @throws Exception If failed. - */ - public void testAddDataFromMap() throws Exception { - try { - cnt = 0; - - startGrids(2); - - Ignite g0 = grid(0); - - IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null); - - Map<Integer, String> map = U.newHashMap(KEYS_COUNT); - - for (int i = 0; i < KEYS_COUNT; i ++) - map.put(i, String.valueOf(i)); - - dataLdr.addData(map); - - dataLdr.close(); - - Random rnd = new Random(); - - IgniteCache<Integer, String> c = g0.jcache(null); - - for (int i = 0; i < KEYS_COUNT; i ++) { - Integer k = rnd.nextInt(KEYS_COUNT); - - String v = c.get(k); - - assertEquals(k.toString(), v); - } - } - finally { - G.stopAll(true); - } - } - - /** - * Gets cache configuration. - * - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration() { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setBackups(1); - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - - return cacheCfg; - } - - /** - * - */ - private static class TestObject implements Serializable { - /** */ - private int val; - - /** - */ - private TestObject() { - // No-op. - } - - /** - * @param val Value. - */ - private TestObject(int val) { - this.val = val; - } - - public Integer val() { - return val; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj instanceof TestObject && ((TestObject)obj).val == val; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java deleted file mode 100644 index 170875c..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java +++ /dev/null @@ -1,970 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.events.EventType.*; - -/** - * - */ -public class DataStreamerProcessorSelfTest extends GridCommonAbstractTest { - /** */ - private static ConcurrentHashMap<Object, Object> storeMap; - - /** */ - private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private CacheMode mode = PARTITIONED; - - /** */ - private boolean nearEnabled = true; - - /** */ - private boolean useCache; - - /** */ - private TestStore store; - - /** {@inheritDoc} */ - @Override public void afterTest() throws Exception { - super.afterTest(); - - useCache = false; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional", "unchecked"}) - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - - cfg.setIncludeProperties(); - - cfg.setMarshaller(new OptimizedMarshaller(false)); - - if (useCache) { - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(mode); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); - cc.setWriteSynchronizationMode(FULL_SYNC); - - cc.setEvictSynchronized(false); - cc.setEvictNearSynchronized(false); - - if (store != null) { - cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - } - - cfg.setCacheConfiguration(cc); - } - else - cfg.setCacheConfiguration(); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testPartitioned() throws Exception { - mode = PARTITIONED; - - checkDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - public void testColocated() throws Exception { - mode = PARTITIONED; - nearEnabled = false; - - checkDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - public void testReplicated() throws Exception { - mode = REPLICATED; - - checkDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - public void testLocal() throws Exception { - mode = LOCAL; - - try { - checkDataStreamer(); - - assert false; - } - catch (IgniteCheckedException e) { - // Cannot load local cache configured remotely. - info("Caught expected exception: " + e); - } - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("ErrorNotRethrown") - private void checkDataStreamer() throws Exception { - try { - Ignite g1 = startGrid(1); - - useCache = true; - - Ignite g2 = startGrid(2); - startGrid(3); - - final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); - - ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); - - final AtomicInteger idxGen = new AtomicInteger(); - final int cnt = 400; - final int threads = 10; - - final CountDownLatch l1 = new CountDownLatch(threads); - - IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - int idx = idxGen.getAndIncrement(); - - futs.add(ldr.addData(idx, idx)); - } - - l1.countDown(); - - for (IgniteFuture<?> fut : futs) - fut.get(); - - return null; - } - }, threads); - - l1.await(); - - // This will wait until data streamer finishes loading. - stopGrid(getTestGridName(1), false); - - f1.get(); - - int s2 = internalCache(2).primaryKeySet().size(); - int s3 = internalCache(3).primaryKeySet().size(); - int total = threads * cnt; - - assertEquals(total, s2 + s3); - - final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null); - - rmvLdr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); - - final CountDownLatch l2 = new CountDownLatch(threads); - - IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - final int key = idxGen.decrementAndGet(); - - futs.add(rmvLdr.removeData(key)); - } - - l2.countDown(); - - for (IgniteFuture<?> fut : futs) - fut.get(); - - return null; - } - }, threads); - - l2.await(); - - rmvLdr.close(false); - - f2.get(); - - s2 = internalCache(2).primaryKeySet().size(); - s3 = internalCache(3).primaryKeySet().size(); - - assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']'; - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testPartitionedIsolated() throws Exception { - mode = PARTITIONED; - - checkIsolatedDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedIsolated() throws Exception { - mode = REPLICATED; - - checkIsolatedDataStreamer(); - } - - /** - * @throws Exception If failed. - */ - private void checkIsolatedDataStreamer() throws Exception { - try { - useCache = true; - - Ignite g1 = startGrid(0); - startGrid(1); - startGrid(2); - - awaitPartitionMapExchange(); - - IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - - for (int i = 0; i < 100; i++) - cache.put(i, -1); - - final int cnt = 40_000; - final int threads = 10; - - try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) { - final AtomicInteger idxGen = new AtomicInteger(); - - IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - for (int i = 0; i < cnt; i++) { - int idx = idxGen.getAndIncrement(); - - ldr.addData(idx, idx); - } - - return null; - } - }, threads); - - f1.get(); - } - - for (int g = 0; g < 3; g++) { - ClusterNode locNode = grid(g).localNode(); - - GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null); - - if (cache0.isNear()) - cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht(); - - CacheAffinity<Integer> aff = cache0.affinity(); - - for (int key = 0; key < cnt * threads; key++) { - if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) { - GridCacheEntryEx entry = cache0.peekEx(key); - - assertNotNull("Missing entry for key: " + key, entry); - assertEquals((key < 100 ? -1 : key), - CU.value(entry.rawGetOrUnmarshal(false), cache0.context(), false)); - } - } - } - } - finally { - stopAllGrids(); - } - } - - /** - * Test primitive arrays can be passed into data streamer. - * - * @throws Exception If failed. - */ - public void testPrimitiveArrays() throws Exception { - try { - useCache = true; - mode = PARTITIONED; - - Ignite g1 = startGrid(1); - startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). - - List<Object> arrays = Arrays.<Object>asList( - new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, - new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); - - IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null); - - for (int i = 0, size = arrays.size(); i < 1000; i++) { - Object arr = arrays.get(i % size); - - dataLdr.addData(i, arr); - dataLdr.addData(i, fixedClosure(arr)); - } - - dataLdr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedMultiThreaded() throws Exception { - mode = REPLICATED; - - checkLoaderMultithreaded(1, 2); - } - - /** - * @throws Exception If failed. - */ - public void testPartitionedMultiThreaded() throws Exception { - mode = PARTITIONED; - - checkLoaderMultithreaded(1, 3); - } - - /** - * Tests loader in multithreaded environment with various count of grids started. - * - * @param nodesCntNoCache How many nodes should be started without cache. - * @param nodesCntCache How many nodes should be started with cache. - * @throws Exception If failed. - */ - protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache) - throws Exception { - try { - // Start all required nodes. - int idx = 1; - - for (int i = 0; i < nodesCntNoCache; i++) - startGrid(idx++); - - useCache = true; - - for (int i = 0; i < nodesCntCache; i++) - startGrid(idx++); - - Ignite g1 = grid(1); - - // Get and configure loader. - final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); - - ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>individual()); - ldr.perNodeBufferSize(2); - - // Define count of puts. - final AtomicInteger idxGen = new AtomicInteger(); - - final AtomicBoolean done = new AtomicBoolean(); - - try { - final int totalPutCnt = 50000; - - IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Collection<IgniteFuture<?>> futs = new ArrayList<>(); - - while (!done.get()) { - int idx = idxGen.getAndIncrement(); - - if (idx >= totalPutCnt) { - info(">>> Stopping producer thread since maximum count of puts reached."); - - break; - } - - futs.add(ldr.addData(idx, idx)); - } - - ldr.flush(); - - for (IgniteFuture<?> fut : futs) - fut.get(); - - return null; - } - }, 5, "producer"); - - IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!done.get()) { - ldr.flush(); - - U.sleep(100); - } - - return null; - } - }, 1, "flusher"); - - // Define index of node being restarted. - final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1; - - IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - try { - for (int i = 0; i < 5; i++) { - Ignite g = startGrid(restartNodeIdx); - - UUID id = g.cluster().localNode().id(); - - info(">>>>>>> Started node: " + id); - - U.sleep(1000); - - stopGrid(getTestGridName(restartNodeIdx), true); - - info(">>>>>>> Stopped node: " + id); - } - } - finally { - done.set(true); - - info("Start stop thread finished."); - } - - return null; - } - }, 1, "start-stop-thread"); - - fut1.get(); - fut2.get(); - fut3.get(); - } - finally { - ldr.close(false); - } - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testLoaderApi() throws Exception { - useCache = true; - - try { - Ignite g1 = startGrid(1); - - IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null); - - ldr.close(false); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - ldr.future().get(); - - try { - // Create another loader. - ldr = g1.dataStreamer("UNKNOWN_CACHE"); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - ldr.close(true); - - assert ldr.future().isDone(); - - ldr.future().get(); - - // Create another loader. - ldr = g1.dataStreamer(null); - - // Cancel with future. - ldr.future().cancel(); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - try { - ldr.future().get(); - - assert false; - } - catch (IgniteFutureCancelledException e) { - info("Caught expected exception: " + e); - } - - // Create another loader. - ldr = g1.dataStreamer(null); - - // This will close loader. - stopGrid(getTestGridName(1), false); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - ldr.future().get(); - } - finally { - stopAllGrids(); - } - } - - /** - * Wraps integer to closure returning it. - * - * @param i Value to wrap. - * @return Callable. - */ - private static Callable<Integer> callable(@Nullable final Integer i) { - return new Callable<Integer>() { - @Override public Integer call() throws Exception { - return i; - } - }; - } - - /** - * Wraps integer to closure returning it. - * - * @param i Value to wrap. - * @return Closure. - */ - private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) { - return new IgniteClosure<Integer, Integer>() { - @Override public Integer apply(Integer e) { - return e == null ? i : e + i; - } - }; - } - - /** - * Wraps object to closure returning it. - * - * @param obj Value to wrap. - * @return Closure. - */ - private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) { - return new IgniteClosure<T, T>() { - @Override public T apply(T e) { - assert e == null || obj == null || e.getClass() == obj.getClass() : - "Expects the same types [e=" + e + ", obj=" + obj + ']'; - - return obj; - } - }; - } - - /** - * Wraps integer to closure expecting it and returning {@code null}. - * - * @param exp Expected closure value. - * @return Remove expected cache value closure. - */ - private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) { - return new IgniteClosure<T, T>() { - @Override public T apply(T act) { - if (exp == null ? act == null : exp.equals(act)) - return null; - - throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']'); - } - }; - } - - /** - * @throws Exception If failed. - */ - public void testFlush() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - final IgniteCache<Integer, Integer> c = g.jcache(null); - - final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); - - ldr.perNodeBufferSize(10); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.localSize() == 0); - - multithreaded(new Callable<Void>() { - @Override - public Void call() throws Exception { - ldr.flush(); - - assertEquals(9, c.size()); - - return null; - } - }, 5, "flush-checker"); - - ldr.addData(100, 100); - - ldr.flush(); - - assertEquals(10, c.size()); - - ldr.addData(200, 200); - - ldr.close(false); - - ldr.future().get(); - - assertEquals(11, c.size()); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTryFlush() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - IgniteCache<Integer, Integer> c = g.jcache(null); - - IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); - - ldr.perNodeBufferSize(10); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.localSize() == 0); - - ldr.tryFlush(); - - Thread.sleep(100); - - assertEquals(9, c.size()); - - ldr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testFlushTimeout() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - final CountDownLatch latch = new CountDownLatch(9); - - g.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - latch.countDown(); - - return true; - } - }, EVT_CACHE_OBJECT_PUT); - - IgniteCache<Integer, Integer> c = g.jcache(null); - - assertTrue(c.localSize() == 0); - - IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null); - - ldr.perNodeBufferSize(10); - ldr.autoFlushFrequency(3000); - ldr.allowOverwrite(true); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.localSize() == 0); - - assertFalse(latch.await(1000, MILLISECONDS)); - - assertTrue(c.localSize() == 0); - - assertTrue(latch.await(3000, MILLISECONDS)); - - assertEquals(9, c.size()); - - ldr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testUpdateStore() throws Exception { - storeMap = new ConcurrentHashMap<>(); - - try { - store = new TestStore(); - - useCache = true; - - Ignite ignite = startGrid(1); - - startGrid(2); - startGrid(3); - - for (int i = 0; i < 1000; i++) - storeMap.put(i, i); - - try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { - ldr.allowOverwrite(true); - - assertFalse(ldr.skipStore()); - - for (int i = 0; i < 1000; i++) - ldr.removeData(i); - - for (int i = 1000; i < 2000; i++) - ldr.addData(i, i); - } - - for (int i = 0; i < 1000; i++) - assertNull(storeMap.get(i)); - - for (int i = 1000; i < 2000; i++) - assertEquals(i, storeMap.get(i)); - - try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) { - ldr.allowOverwrite(true); - - ldr.skipStore(true); - - for (int i = 0; i < 1000; i++) - ldr.addData(i, i); - - for (int i = 1000; i < 2000; i++) - ldr.removeData(i); - } - - IgniteCache<Object, Object> cache = ignite.jcache(null); - - for (int i = 0; i < 1000; i++) { - assertNull(storeMap.get(i)); - - assertEquals(i, cache.get(i)); - } - - for (int i = 1000; i < 2000; i++) { - assertEquals(i, storeMap.get(i)); - - assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); - } - } - finally { - storeMap = null; - } - } - - /** - * @throws Exception If failed. - */ - public void testCustomUserUpdater() throws Exception { - useCache = true; - - try { - Ignite ignite = startGrid(1); - - startGrid(2); - startGrid(3); - - try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(null)) { - ldr.allowOverwrite(true); - - ldr.updater(new IgniteDataStreamer.Updater<String, TestObject>() { - @Override public void update(IgniteCache<String, TestObject> cache, - Collection<Map.Entry<String, TestObject>> entries) { - for (Map.Entry<String, TestObject> e : entries) { - assertTrue(e.getKey() instanceof String); - assertTrue(e.getValue() instanceof TestObject); - - cache.put(e.getKey(), new TestObject(e.getValue().val + 1)); - } - } - }); - - for (int i = 0; i < 100; i++) - ldr.addData(String.valueOf(i), new TestObject(i)); - } - - IgniteCache<String, TestObject> cache = ignite.jcache(null); - - for (int i = 0; i < 100; i++) { - TestObject val = cache.get(String.valueOf(i)); - - assertNotNull(val); - assertEquals(i + 1, val.val); - } - } - finally { - stopAllGrids(); - } - } - - /** - * - */ - private static class TestObject { - /** Value. */ - private final int val; - - /** - * @param val Value. - */ - private TestObject(int val) { - this.val = val; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - TestObject obj = (TestObject)o; - - return val == obj.val; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val; - } - } - - /** - * - */ - private static class TestStore extends CacheStoreAdapter<Object, Object> { - /** {@inheritDoc} */ - @Nullable @Override public Object load(Object key) { - return storeMap.get(key); - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<?, ?> entry) { - storeMap.put(entry.getKey(), entry.getValue()); - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) { - storeMap.remove(key); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java deleted file mode 100644 index 053228c..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastream; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.util.concurrent.*; - -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.events.EventType.*; - -/** - * Data streamer performance test. Compares group lock data streamer to traditional lock. - * <p> - * Disable assertions and give at least 2 GB heap to run this test. - */ -public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int GRID_CNT = 3; - - /** */ - private static final int ENTRY_CNT = 80000; - - /** */ - private boolean useCache; - - /** */ - private String[] vals = new String[2048]; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(spi); - - cfg.setIncludeProperties(); - - cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - - cfg.setConnectorConfiguration(null); - - cfg.setPeerClassLoadingEnabled(true); - - if (useCache) { - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(PARTITIONED); - - cc.setDistributionMode(PARTITIONED_ONLY); - cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setStartSize(ENTRY_CNT / GRID_CNT); - cc.setSwapEnabled(false); - - cc.setBackups(1); - - cfg.setCacheSanityCheckEnabled(false); - cfg.setCacheConfiguration(cc); - } - else - cfg.setCacheConfiguration(); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - for (int i = 0; i < vals.length; i++) { - int valLen = ThreadLocalRandom8.current().nextInt(128, 512); - - StringBuilder sb = new StringBuilder(); - - for (int j = 0; j < valLen; j++) - sb.append('a' + ThreadLocalRandom8.current().nextInt(20)); - - vals[i] = sb.toString(); - - info("Value: " + vals[i]); - } - } - - /** - * @throws Exception If failed. - */ - public void testPerformance() throws Exception { - doTest(); - } - - /** - * @throws Exception If failed. - */ - private void doTest() throws Exception { - System.gc(); - System.gc(); - System.gc(); - - try { - useCache = true; - - startGridsMultiThreaded(GRID_CNT); - - useCache = false; - - Ignite ignite = startGrid(); - - final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null); - - ldr.perNodeBufferSize(8192); - ldr.updater(DataStreamerCacheUpdaters.<Integer, String>batchedSorted()); - ldr.autoFlushFrequency(0); - - final LongAdder cnt = new LongAdder(); - - long start = U.currentTimeMillis(); - - Thread t = new Thread(new Runnable() { - @SuppressWarnings("BusyWait") - @Override public void run() { - while (true) { - try { - Thread.sleep(10000); - } - catch (InterruptedException ignored) { - break; - } - - info(">>> Adds/sec: " + cnt.sumThenReset() / 10); - } - } - }); - - t.setDaemon(true); - - t.start(); - - int threadNum = 2;//Runtime.getRuntime().availableProcessors(); - - multithreaded(new Callable<Object>() { - @SuppressWarnings("InfiniteLoopStatement") - @Override public Object call() throws Exception { - ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); - - while (true) { - int i = rnd.nextInt(ENTRY_CNT); - - ldr.addData(i, vals[rnd.nextInt(vals.length)]); - - cnt.increment(); - } - } - }, threadNum, "loader"); - - info("Closing loader..."); - - ldr.close(false); - - long duration = U.currentTimeMillis() - start; - - info("Finished performance test. Duration: " + duration + "ms."); - } - finally { - stopAllGrids(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java new file mode 100644 index 0000000..98737a1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Tests for {@code IgniteDataStreamerImpl}. + */ +public class DataStreamerImplSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of keys to load via data streamer. */ + private static final int KEYS_COUNT = 1000; + + /** Started grid counter. */ + private static int cnt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + // Forth node goes without cache. + if (cnt < 4) + cfg.setCacheConfiguration(cacheConfiguration()); + + cnt++; + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testNullPointerExceptionUponDataStreamerClosing() throws Exception { + try { + startGrids(5); + + final CyclicBarrier barrier = new CyclicBarrier(2); + + multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + U.awaitQuiet(barrier); + + G.stopAll(true); + + return null; + } + }, 1); + + Ignite g4 = grid(4); + + IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null); + + dataLdr.perNodeBufferSize(32); + + for (int i = 0; i < 100000; i += 2) { + dataLdr.addData(i, i); + dataLdr.removeData(i + 1); + } + + U.awaitQuiet(barrier); + + info("Closing data streamer."); + + try { + dataLdr.close(true); + } + catch (IllegalStateException ignore) { + // This is ok to ignore this exception as test is racy by it's nature - + // grid is stopping in different thread. + } + } + finally { + G.stopAll(true); + } + } + + /** + * Data streamer should correctly load entries from HashMap in case of grids with more than one node + * and with GridOptimizedMarshaller that requires serializable. + * + * @throws Exception If failed. + */ + public void testAddDataFromMap() throws Exception { + try { + cnt = 0; + + startGrids(2); + + Ignite g0 = grid(0); + + IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null); + + Map<Integer, String> map = U.newHashMap(KEYS_COUNT); + + for (int i = 0; i < KEYS_COUNT; i ++) + map.put(i, String.valueOf(i)); + + dataLdr.addData(map); + + dataLdr.close(); + + Random rnd = new Random(); + + IgniteCache<Integer, String> c = g0.jcache(null); + + for (int i = 0; i < KEYS_COUNT; i ++) { + Integer k = rnd.nextInt(KEYS_COUNT); + + String v = c.get(k); + + assertEquals(k.toString(), v); + } + } + finally { + G.stopAll(true); + } + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setBackups(1); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + return cacheCfg; + } + + /** + * + */ + private static class TestObject implements Serializable { + /** */ + private int val; + + /** + */ + private TestObject() { + // No-op. + } + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + public Integer val() { + return val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof TestObject && ((TestObject)obj).val == val; + } + } +}