http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index db3d350..ed8e573 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -198,19 +198,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed // Remap regular mappings. final Buffer buf = bufMappings.remove(id); + // Only async notification is possible since + // discovery thread may be trapped otherwise. if (buf != null) { - // Only async notification is possible since - // discovery thread may be trapped otherwise. - ctx.closure().callLocalSafe( - new Callable<Object>() { - @Override public Object call() throws Exception { - buf.onNodeLeft(); - - return null; - } - }, - true /* system pool */ - ); + waitAffinityAndRun(new Runnable() { + @Override public void run() { + buf.onNodeLeft(); + } + }, discoEvt.topologyVersion(), true); } } }; @@ -248,6 +243,31 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * @param c Closure to run. + * @param topVer Topology version to wait for. + * @param async Async flag. + */ + private void waitAffinityAndRun(final Runnable c, long topVer, boolean async) { + AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer, 0); + + IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer0); + + if (fut != null && !fut.isDone()) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + ctx.closure().runLocalSafe(c, true); + } + }); + } + else { + if (async) + ctx.closure().runLocalSafe(c, true); + else + c.run(); + } + } + + /** * @return Cache object context. */ public CacheObjectContext cacheObjectContext() { @@ -527,6 +547,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed boolean initPda = ctx.deploy().enabled() && jobPda == null; + AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion(); + for (DataStreamerEntry entry : entries) { List<ClusterNode> nodes; @@ -543,7 +565,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed initPda = false; } - nodes = nodes(key); + nodes = nodes(key, topVer); } catch (IgniteCheckedException e) { resFut.onDone(e); @@ -621,10 +643,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } }; - GridFutureAdapter<?> f; + final GridFutureAdapter<?> f; try { - f = buf.update(entriesForNode, lsnr); + f = buf.update(entriesForNode, topVer, lsnr); } catch (IgniteInterruptedCheckedException e1) { resFut.onDone(e1); @@ -633,30 +655,38 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } if (ctx.discovery().node(nodeId) == null) { - if (bufMappings.remove(nodeId, buf)) - buf.onNodeLeft(); + if (bufMappings.remove(nodeId, buf)) { + final Buffer buf0 = buf; + + waitAffinityAndRun(new Runnable() { + @Override public void run() { + buf0.onNodeLeft(); - if (f != null) - f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + - "(node has left): " + nodeId)); + if (f != null) + f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + nodeId)); + } + }, ctx.discovery().topologyVersion(), false); + } } } } /** * @param key Key to map. + * @param topVer Topology version. * @return Nodes to send requests to. * @throws IgniteCheckedException If failed. */ - private List<ClusterNode> nodes(KeyCacheObject key) throws IgniteCheckedException { + private List<ClusterNode> nodes(KeyCacheObject key, AffinityTopologyVersion topVer) throws IgniteCheckedException { GridAffinityProcessor aff = ctx.affinity(); List<ClusterNode> res = null; if (!allowOverwrite()) - res = aff.mapKeyToPrimaryAndBackups(cacheName, key); + res = aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer); else { - ClusterNode node = aff.mapKeyToNode(cacheName, key); + ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer); if (node != null) res = Collections.singletonList(node); @@ -959,11 +989,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * @param newEntries Infos. + * @param topVer Topology version. * @param lsnr Listener for the operation future. * @throws IgniteInterruptedCheckedException If failed. * @return Future for operation. */ @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries, + AffinityTopologyVersion topVer, IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException { List<DataStreamerEntry> entries0 = null; GridFutureAdapter<Object> curFut0; @@ -986,7 +1018,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } if (entries0 != null) { - submit(entries0, curFut0); + submit(entries0, topVer, curFut0); if (cancelled) curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this)); @@ -1023,7 +1055,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } if (entries0 != null) - submit(entries0, curFut0); + submit(entries0, null, curFut0); // Create compound future for this flush. GridCompoundFuture<Object, Object> res = null; @@ -1068,10 +1100,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * @param entries Entries to submit. + * @param topVer Topology version. * @param curFut Current future. * @throws IgniteInterruptedCheckedException If interrupted. */ - private void submit(final Collection<DataStreamerEntry> entries, final GridFutureAdapter<Object> curFut) + private void submit(final Collection<DataStreamerEntry> entries, + @Nullable AffinityTopologyVersion topVer, + final GridFutureAdapter<Object> curFut) throws IgniteInterruptedCheckedException { assert entries != null; assert !entries.isEmpty(); @@ -1160,6 +1195,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed reqs.put(reqId, (GridFutureAdapter<Object>)fut); + if (topVer == null) + topVer = ctx.cache().context().exchange().readyAffinityVersion(); + DataStreamerRequest req = new DataStreamerRequest( reqId, topicBytes, @@ -1174,7 +1212,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed dep != null ? dep.participants() : null, dep != null ? dep.classLoaderId() : null, dep == null, - ctx.cache().context().exchange().readyAffinityVersion()); + topVer); try { ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); @@ -1422,6 +1460,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed cctx.evicts().touch(entry, topVer); CU.unwindEvicts(cctx); + + entry.onUnlock(); } catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { // No-op.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 72911af..aa3bfe2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.datastructures; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; @@ -32,6 +33,7 @@ import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import org.jsr166.*; +import javax.cache.event.*; import javax.cache.processor.*; import java.io.*; import java.util.*; @@ -40,7 +42,6 @@ import java.util.concurrent.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; @@ -99,6 +100,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** */ private IgniteInternalCache<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> utilityDataCache; + /** */ + private volatile UUID qryId; + /** * @param ctx Context. */ @@ -112,7 +116,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void onKernalStart() { + @Override public void onKernalStart() throws IgniteCheckedException { if (ctx.config().isDaemon()) return; @@ -139,11 +143,35 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { seqView = atomicsCache; - dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context(); + dsCacheCtx = atomicsCache.context(); } } /** + * @throws IgniteCheckedException If failed. + */ + private void startQuery() throws IgniteCheckedException { + if (qryId == null) { + synchronized (this) { + if (qryId == null) { + qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(), + new DataStructuresEntryFilter(), + dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(), + false); + } + } + } + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + if (qryId != null) + dsCacheCtx.continuousQueries().cancelInternalQuery(qryId); + } + + /** * Gets a sequence from cache or creates one if it's not cached. * * @param name Sequence name. @@ -161,6 +189,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX<IgniteAtomicSequence>() { @Override public IgniteAtomicSequence applyx() throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); @@ -287,6 +317,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX<IgniteAtomicLong>() { @Override public IgniteAtomicLong applyx() throws IgniteCheckedException { final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); @@ -490,6 +522,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX<IgniteAtomicReference>() { @Override public IgniteAtomicReference<T> applyx() throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); @@ -591,6 +625,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX<IgniteAtomicStamped>() { @Override public IgniteAtomicStamped<T, S> applyx() throws IgniteCheckedException { GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name); @@ -899,6 +935,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX<IgniteCountDownLatch>() { @Override public IgniteCountDownLatch applyx() throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); @@ -906,8 +944,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue val = cast(dsView.get(key), - GridCacheCountDownLatchValue.class); + GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class); // Check that count down hasn't been created in other thread yet. GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class); @@ -1034,28 +1071,46 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** - * Transaction committed callback for transaction manager. * - * @param tx Committed transaction. */ - public <K, V> void onTxCommitted(IgniteInternalTx tx) { - if (dsCacheCtx == null) - return; + static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> { + /** */ + private static final long serialVersionUID = 0L; - if (!dsCacheCtx.isDht() && tx.internal() && (!dsCacheCtx.isColocated() || dsCacheCtx.isReplicated())) { - Collection<IgniteTxEntry> entries = tx.writeEntries(); + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { + if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) + return evt.getValue() instanceof GridCacheCountDownLatchValue; + else { + assert evt.getEventType() == EventType.REMOVED : evt; - if (log.isDebugEnabled()) - log.debug("Committed entries: " + entries); + return true; + } + } - for (IgniteTxEntry entry : entries) { - // Check updated or created GridCacheInternalKey keys. - if ((entry.op() == CREATE || entry.op() == UPDATE) && entry.key().internal()) { - GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false); + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStructuresEntryFilter.class, this); + } + } - Object val0 = CU.value(entry.value(), entry.context(), false); + /** + * + */ + private class DataStructuresEntryListener implements + CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> { + /** {@inheritDoc} */ + @Override public void onUpdated( + Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts) + throws CacheEntryListenerException + { + for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) { + if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) { + GridCacheInternal val0 = evt.getValue(); if (val0 instanceof GridCacheCountDownLatchValue) { + GridCacheInternalKey key = evt.getKey(); + // Notify latch on changes. GridCacheRemovable latch = dsMap.get(key); @@ -1067,8 +1122,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { latch0.onUpdate(val.get()); if (val.get() == 0 && val.autoDelete()) { - entry.cached().markObsolete(dsCacheCtx.versions().next()); - dsMap.remove(key); latch.onRemoved(); @@ -1080,11 +1133,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { ", actual=" + latch.getClass() + ", value=" + latch + ']'); } } + } + else { + assert evt.getEventType() == EventType.REMOVED : evt; - // Check deleted GridCacheInternal keys. - if (entry.op() == DELETE && entry.key().internal()) { - GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false); + GridCacheInternal key = evt.getKey(); // Entry's val is null if entry deleted. GridCacheRemovable obj = dsMap.remove(key); @@ -1094,6 +1148,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } } } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStructuresEntryListener.class, this); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java index 65cb48d..5fd6c81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java @@ -98,5 +98,5 @@ public interface HadoopJob { /** * Cleans up the job staging directory. */ - void cleanupStagingDirectory(); + public void cleanupStagingDirectory(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java index 371fd81..3d2ee17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -21,13 +21,14 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import java.util.*; +import java.util.concurrent.*; /** * Task context. */ public abstract class HadoopTaskContext { /** */ - private final HadoopJob job; + protected final HadoopJob job; /** */ private HadoopTaskInput input; @@ -187,4 +188,15 @@ public abstract class HadoopTaskContext { * @throws IgniteCheckedException If failed. */ public abstract void cleanupTaskEnvironment() throws IgniteCheckedException; + + /** + * Executes a callable on behalf of the job owner. + * In case of embedded task execution the implementation of this method + * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with. + * @param c The callable. + * @param <T> The return type of the Callable. + * @return The result of the callable. + * @throws IgniteCheckedException On any error in callable. + */ + public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index 7c1a837..361f75f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem { /** Property name for URI of file system. */ public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; - /** Property name for user name of file system. */ - public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME"; + /** Property name for default user name of file system. + * NOTE: for secondary file system this is just a default user name, which is used + * when the 2ndary filesystem is used outside of any user context. + * If another user name is set in the context, 2ndary file system will work on behalf + * of that user, which is different from the default. */ + public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME"; /** * Stops IGFS cleaning all used resources. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 34636d2..c3495e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -245,8 +245,12 @@ public final class IgfsImpl implements IgfsEx { for (IgfsFileWorkerBatch batch : workerMap.values()) batch.cancel(); - if (secondaryFs instanceof AutoCloseable) - U.closeQuiet((AutoCloseable)secondaryFs); + try { + secondaryFs.close(); + } + catch (Exception e) { + log.error("Failed to close secondary file system.", e); + } } igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index 8a8b858..cfe6ed4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler { private final int bufSize; // Buffer size. Must not be less then file block size. /** IGFS instance for this handler. */ - private IgfsEx igfs; + private final IgfsEx igfs; /** Resource ID generator. */ - private AtomicLong rsrcIdGen = new AtomicLong(); + private final AtomicLong rsrcIdGen = new AtomicLong(); /** Stopping flag. */ private volatile boolean stopping; @@ -241,138 +241,148 @@ class IgfsIpcHandler implements IgfsServerHandler { * @return Response message. * @throws IgniteCheckedException If failed. */ - private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd, + private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd, IgfsMessage msg) throws IgniteCheckedException { - IgfsPathControlRequest req = (IgfsPathControlRequest)msg; + final IgfsPathControlRequest req = (IgfsPathControlRequest)msg; if (log.isDebugEnabled()) log.debug("Processing path control request [igfsName=" + igfs.name() + ", req=" + req + ']'); - IgfsControlResponse res = new IgfsControlResponse(); + final IgfsControlResponse res = new IgfsControlResponse(); + + final String userName = req.userName(); + + assert userName != null; try { - switch (cmd) { - case EXISTS: - res.response(igfs.exists(req.path())); + IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() { + @Override public Void apply() { + switch (cmd) { + case EXISTS: + res.response(igfs.exists(req.path())); - break; + break; - case INFO: - res.response(igfs.info(req.path())); + case INFO: + res.response(igfs.info(req.path())); - break; + break; - case PATH_SUMMARY: - res.response(igfs.summary(req.path())); + case PATH_SUMMARY: + res.response(igfs.summary(req.path())); - break; + break; - case UPDATE: - res.response(igfs.update(req.path(), req.properties())); + case UPDATE: + res.response(igfs.update(req.path(), req.properties())); - break; + break; - case RENAME: - igfs.rename(req.path(), req.destinationPath()); + case RENAME: + igfs.rename(req.path(), req.destinationPath()); - res.response(true); + res.response(true); - break; + break; - case DELETE: - res.response(igfs.delete(req.path(), req.flag())); + case DELETE: + res.response(igfs.delete(req.path(), req.flag())); - break; + break; - case MAKE_DIRECTORIES: - igfs.mkdirs(req.path(), req.properties()); + case MAKE_DIRECTORIES: + igfs.mkdirs(req.path(), req.properties()); - res.response(true); + res.response(true); - break; + break; - case LIST_PATHS: - res.paths(igfs.listPaths(req.path())); + case LIST_PATHS: + res.paths(igfs.listPaths(req.path())); - break; + break; - case LIST_FILES: - res.files(igfs.listFiles(req.path())); + case LIST_FILES: + res.files(igfs.listFiles(req.path())); - break; + break; - case SET_TIMES: - igfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); + case SET_TIMES: + igfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); - res.response(true); + res.response(true); - break; + break; - case AFFINITY: - res.locations(igfs.affinity(req.path(), req.start(), req.length())); + case AFFINITY: + res.locations(igfs.affinity(req.path(), req.start(), req.length())); - break; + break; - case OPEN_READ: { - IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) : - igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); + case OPEN_READ: { + IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) : + igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); - long streamId = registerResource(ses, igfsIn); + long streamId = registerResource(ses, igfsIn); - if (log.isDebugEnabled()) - log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null, - igfsIn.fileInfo().modificationTime()); + IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null, + igfsIn.fileInfo().modificationTime()); - res.response(new IgfsInputStreamDescriptor(streamId, info.length())); + res.response(new IgfsInputStreamDescriptor(streamId, info.length())); - break; - } + break; + } - case OPEN_CREATE: { - long streamId = registerResource(ses, igfs.create( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Overwrite if exists. - affinityKey(req), // Affinity key based on replication factor. - req.replication(),// Replication factor. - req.blockSize(), // Block size. - req.properties() // File properties. - )); + case OPEN_CREATE: { + long streamId = registerResource(ses, igfs.create( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Overwrite if exists. + affinityKey(req), // Affinity key based on replication factor. + req.replication(),// Replication factor. + req.blockSize(), // Block size. + req.properties() // File properties. + )); - if (log.isDebugEnabled()) - log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - res.response(streamId); + res.response(streamId); - break; - } + break; + } - case OPEN_APPEND: { - long streamId = registerResource(ses, igfs.append( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Create if absent. - req.properties() // File properties. - )); + case OPEN_APPEND: { + long streamId = registerResource(ses, igfs.append( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Create if absent. + req.properties() // File properties. + )); - if (log.isDebugEnabled()) - log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - res.response(streamId); + res.response(streamId); - break; - } + break; + } - default: - assert false : "Unhandled path control request command: " + cmd; + default: + assert false : "Unhandled path control request command: " + cmd; - break; - } + break; + } + + return null; + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index e33e0d4..b98c5d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -669,7 +669,7 @@ public class IgfsMetaManager extends IgfsManager { private Map<String, IgfsListingEntry> directoryListing(IgniteUuid fileId, boolean skipTx) throws IgniteCheckedException { assert fileId != null; - IgfsFileInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singletonList(fileId)).get(fileId) : + IgfsFileInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singleton(fileId)).get(fileId) : id2InfoPrj.get(fileId); return info == null ? Collections.<String, IgfsListingEntry>emptyMap() : info.listing(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java index 683b317..44ee90f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java @@ -30,14 +30,14 @@ import java.util.*; */ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { /** Delegate. */ - private final IgfsImpl igfs; + private final IgfsEx igfs; /** * Constructor. * * @param igfs Delegate. */ - IgfsSecondaryFileSystemImpl(IgfsImpl igfs) { + IgfsSecondaryFileSystemImpl(IgfsEx igfs) { this.igfs = igfs; } @@ -118,4 +118,9 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { @Override public Map<String, String> properties() { return Collections.emptyMap(); } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java index 253d5be..caa6866 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java @@ -239,13 +239,13 @@ public class IgfsServer { */ private class ClientWorker extends GridWorker { /** Connected client endpoint. */ - private IpcEndpoint endpoint; + private final IpcEndpoint endpoint; /** Data output stream. */ private final IgfsDataOutputStream out; /** Client session object. */ - private IgfsClientSession ses; + private final IgfsClientSession ses; /** Queue node for fast unlink. */ private ConcurrentLinkedDeque8.Node<ClientWorker> node; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 4b0234f..8026a44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; import java.lang.reflect.*; @@ -88,4 +90,18 @@ public class IgfsUtils { private IgfsUtils() { // No-op. } + + /** + * Provides non-null user name. + * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME}, + * which is the current process owner user. + * @param user a user name to be fixed. + * @return non-null interned user name. + */ + public static String fixUserName(@Nullable String user) { + if (F.isEmpty(user)) + user = FileSystemConfiguration.DFLT_USER_NAME; + + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index cd4d543..ed8e1e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -121,7 +121,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (F.isEmpty(meta.getValueType())) throw new IgniteCheckedException("Value type is not set: " + meta); - TypeDescriptor desc = new TypeDescriptor(ccfg); + TypeDescriptor desc = new TypeDescriptor(); Class<?> valCls = U.classForName(meta.getValueType(), null); @@ -160,7 +160,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { Class<?> keyCls = clss[i]; Class<?> valCls = clss[i + 1]; - TypeDescriptor desc = processKeyAndValueClasses(ccfg, keyCls, valCls); + TypeDescriptor desc = processKeyAndValueClasses(keyCls, valCls); addTypeByName(ccfg, desc); types.put(new TypeId(ccfg.getName(), valCls), desc); @@ -188,15 +188,17 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param ccfg Cache configuration. * @param keyCls Key class. * @param valCls Value class. * @return Type descriptor. * @throws IgniteCheckedException If failed. */ - private TypeDescriptor processKeyAndValueClasses(CacheConfiguration<?,?> ccfg, Class<?> keyCls, Class<?> valCls) + private TypeDescriptor processKeyAndValueClasses( + Class<?> keyCls, + Class<?> valCls + ) throws IgniteCheckedException { - TypeDescriptor d = new TypeDescriptor(ccfg); + TypeDescriptor d = new TypeDescriptor(); d.keyClass(keyCls); d.valueClass(valCls); @@ -318,7 +320,12 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to rebuild indexes (grid is stopping)."); try { - return rebuildIndexes(space, typesByName.get(new TypeName(space, valTypeName))); + return rebuildIndexes( + space, + typesByName.get( + new TypeName( + space, + valTypeName))); } finally { busyLock.leaveBusy(); @@ -539,7 +546,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return idx.queryTwoStep(ctx.cache().internalCache(space).context(), qry); + return idx.queryTwoStep( + ctx.cache().internalCache(space).context(), + qry); } finally { busyLock.leaveBusy(); @@ -589,59 +598,62 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param qry Query. * @return Cursor. */ - public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(GridCacheContext<?,?> cctx, SqlQuery qry) { + public <K, V> Iterator<Cache.Entry<K, V>> queryLocal(final GridCacheContext<?, ?> cctx, final SqlQuery qry) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - String space = cctx.name(); - String type = qry.getType(); - String sqlQry = qry.getSql(); - Object[] params = qry.getArgs(); - - TypeDescriptor typeDesc = typesByName.get(new TypeName(space, type)); - - if (typeDesc == null || !typeDesc.registered()) - throw new CacheException("Failed to find SQL table for type: " + type); - - final GridCloseableIterator<IgniteBiTuple<K,V>> i = idx.query(space, sqlQry, F.asList(params), typeDesc, - idx.backupFilter()); - - if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { - ctx.event().record(new CacheQueryExecutedEvent<>( - ctx.discovery().localNode(), - "SQL query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.SQL.name(), - null, - null, - sqlQry, - null, - null, - params, - null, - null)); - } - - return new ClIter<Cache.Entry<K,V>>() { - @Override public void close() throws Exception { - i.close(); - } - - @Override public boolean hasNext() { - return i.hasNext(); - } - - @Override public Cache.Entry<K,V> next() { - IgniteBiTuple<K,V> t = i.next(); - - return new CacheEntryImpl<>(t.getKey(), t.getValue()); - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; + return executeQuery( + cctx, + new IgniteOutClosureX<Iterator<Cache.Entry<K, V>>>() { + @Override public Iterator<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { + String space = cctx.name(); + String type = qry.getType(); + String sqlQry = qry.getSql(); + Object[] params = qry.getArgs(); + + TypeDescriptor typeDesc = typesByName.get( + new TypeName( + space, + type)); + + if (typeDesc == null || !typeDesc.registered()) + throw new CacheException("Failed to find SQL table for type: " + type); + + final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.query( + space, + sqlQry, + F.asList(params), + typeDesc, + idx.backupFilter()); + + sendQueryExecutedEvent( + sqlQry, + params); + + return new ClIter<Cache.Entry<K, V>>() { + @Override public void close() throws Exception { + i.close(); + } + + @Override public boolean hasNext() { + return i.hasNext(); + } + + @Override public Cache.Entry<K, V> next() { + IgniteBiTuple<K, V> t = i.next(); + + return new CacheEntryImpl<>( + t.getKey(), + t.getValue()); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -652,6 +664,28 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @param sqlQry Sql query. + * @param params Params. + */ + private void sendQueryExecutedEvent(String sqlQry, Object[] params) { + if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + ctx.event().record(new CacheQueryExecutedEvent<>( + ctx.discovery().localNode(), + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL.name(), + null, + null, + sqlQry, + null, + null, + params, + null, + null)); + } + } + + /** * @return Message factory for {@link GridIoManager}. */ public MessageFactory messageFactory() { @@ -670,39 +704,29 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param qry Query. * @return Iterator. */ - public QueryCursor<List<?>> queryLocalFields(GridCacheContext<?,?> cctx, SqlFieldsQuery qry) { + public QueryCursor<List<?>> queryLocalFields(final GridCacheContext<?,?> cctx, final SqlFieldsQuery qry) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - String space = cctx.name(); - String sql = qry.getSql(); - Object[] args = qry.getArgs(); - - GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); - - if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { - ctx.event().record(new CacheQueryExecutedEvent<>( - ctx.discovery().localNode(), - "SQL query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.SQL.name(), - null, - null, - sql, - null, - null, - args, - null, - null)); - } + return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() { + @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException { + String space = cctx.name(); + String sql = qry.getSql(); + Object[] args = qry.getArgs(); - QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( - new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable())); + GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); - cursor.fieldsMeta(res.metaData()); + sendQueryExecutedEvent(sql, args); - return cursor; + QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( + new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable())); + + cursor.fieldsMeta(res.metaData()); + + return cursor; + } + }); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -793,7 +817,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (type == null || !type.registered()) throw new CacheException("Failed to find SQL table for type: " + resType); - return idx.queryText(space, clause, type, filters); + return idx.queryText( + space, + clause, + type, + filters); } finally { busyLock.leaveBusy(); @@ -808,7 +836,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @return Field rows. * @throws IgniteCheckedException If failed. */ - public <K, V> GridQueryFieldsResult queryFields(@Nullable String space, String clause, Collection<Object> params, + public GridQueryFieldsResult queryFields(@Nullable String space, String clause, Collection<Object> params, IndexingQueryFilter filters) throws IgniteCheckedException { checkEnabled(); @@ -837,7 +865,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (ctx.indexing().enabled()) { CacheObjectContext coctx = cacheObjectContext(spaceName); - ctx.indexing().onSwap(spaceName, key.value(coctx, false)); + ctx.indexing().onSwap( + spaceName, + key.value( + coctx, + false)); } if (idx == null) @@ -847,7 +879,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to process swap event (grid is stopping)."); try { - idx.onSwap(spaceName, key); + idx.onSwap( + spaceName, + key); } finally { busyLock.leaveBusy(); @@ -1067,7 +1101,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { assert valCls != null; for (Map.Entry<String, Class<?>> entry : meta.getAscendingFields().entrySet()) { - ClassProperty prop = buildClassProperty(keyCls, valCls, entry.getKey(), entry.getValue()); + ClassProperty prop = buildClassProperty( + keyCls, + valCls, + entry.getKey(), + entry.getValue()); d.addProperty(prop, false); @@ -1079,7 +1117,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { } for (Map.Entry<String, Class<?>> entry : meta.getDescendingFields().entrySet()) { - ClassProperty prop = buildClassProperty(keyCls, valCls, entry.getKey(), entry.getValue()); + ClassProperty prop = buildClassProperty( + keyCls, + valCls, + entry.getKey(), + entry.getValue()); d.addProperty(prop, false); @@ -1091,7 +1133,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { } for (String txtIdx : meta.getTextFields()) { - ClassProperty prop = buildClassProperty(keyCls, valCls, txtIdx, String.class); + ClassProperty prop = buildClassProperty( + keyCls, + valCls, + txtIdx, + String.class); d.addProperty(prop, false); @@ -1109,7 +1155,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { int order = 0; for (Map.Entry<String, IgniteBiTuple<Class<?>, Boolean>> idxField : idxFields.entrySet()) { - ClassProperty prop = buildClassProperty(keyCls, valCls, idxField.getKey(), idxField.getValue().get1()); + ClassProperty prop = buildClassProperty( + keyCls, + valCls, + idxField.getKey(), + idxField.getValue().get1()); d.addProperty(prop, false); @@ -1123,7 +1173,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { } for (Map.Entry<String, Class<?>> entry : meta.getQueryFields().entrySet()) { - ClassProperty prop = buildClassProperty(keyCls, valCls, entry.getKey(), entry.getValue()); + ClassProperty prop = buildClassProperty( + keyCls, + valCls, + entry.getKey(), + entry.getValue()); d.addProperty(prop, false); } @@ -1231,7 +1285,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { */ private static ClassProperty buildClassProperty(Class<?> keyCls, Class<?> valCls, String pathStr, Class<?> resType) throws IgniteCheckedException { - ClassProperty res = buildClassProperty(true, keyCls, pathStr, resType); + ClassProperty res = buildClassProperty( + true, + keyCls, + pathStr, + resType); if (res == null) // We check key before value consistently with PortableProperty. res = buildClassProperty(false, valCls, pathStr, resType); @@ -1330,6 +1388,59 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @param cctx Cache context. + * @param clo Closure. + */ + private <R> R executeQuery(GridCacheContext<?,?> cctx, IgniteOutClosureX<R> clo) + throws IgniteCheckedException { + final long start = U.currentTimeMillis(); + + Throwable err = null; + + R res = null; + + try { + res = clo.apply(); + + return res; + } + catch (GridClosureException e) { + err = e.unwrap(); + + throw (IgniteCheckedException)err; + } + finally { + GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics(); + + onExecuted(cctx, metrics, res, err, start, U.currentTimeMillis() - start, log); + } + } + + /** + * @param cctx Cctx. + * @param metrics Metrics. + * @param res Result. + * @param err Err. + * @param startTime Start time. + * @param duration Duration. + * @param log Logger. + */ + public static void onExecuted(GridCacheContext<?, ?> cctx, GridCacheQueryMetricsAdapter metrics, + Object res, Throwable err, long startTime, long duration, IgniteLogger log) { + boolean fail = err != null; + + // Update own metrics. + metrics.onQueryExecute(duration, fail); + + // Update metrics in query manager. + cctx.queries().onMetricsUpdate(duration, fail); + + if (log.isTraceEnabled()) + log.trace("Query execution finished [startTime=" + startTime + + ", duration=" + duration + ", fail=" + (err != null) + ", res=" + res + ']'); + } + + /** * */ private abstract static class Property { @@ -1538,9 +1649,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { */ private static class TypeDescriptor implements GridQueryTypeDescriptor { /** */ - private CacheConfiguration<?,?> ccfg; - - /** */ private String name; /** Value field names and types with preserved order. */ @@ -1571,13 +1679,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { private boolean registered; /** - * @param ccfg Cache configuration. - */ - private TypeDescriptor(CacheConfiguration<?,?> ccfg) { - this.ccfg = ccfg; - } - - /** * @return {@code True} if type registration in SPI was finished and type was not rejected. */ boolean registered() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 22d1ff0..64eb1c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -69,7 +70,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap8<>(); /** Deployment executor service. */ - private final ExecutorService depExe = Executors.newSingleThreadExecutor(); + private final ExecutorService depExe; /** Busy lock. */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); @@ -97,6 +98,8 @@ public class GridServiceProcessor extends GridProcessorAdapter { */ public GridServiceProcessor(GridKernalContext ctx) { super(ctx); + + depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.gridName(), "srvc-deploy")); } /** {@inheritDoc} */ @@ -128,10 +131,10 @@ public class GridServiceProcessor extends GridProcessorAdapter { ctx.cache().context().deploy().ignoreOwnership(true); cfgQryId = cache.context().continuousQueries().executeInternalQuery( - new DeploymentListener(), null, true, true); + new DeploymentListener(), null, cache.context().affinityNode(), true); assignQryId = cache.context().continuousQueries().executeInternalQuery( - new AssignmentListener(), null, true, true); + new AssignmentListener(), null, cache.context().affinityNode(), true); } finally { if (ctx.deploy().enabled()) @@ -345,7 +348,12 @@ public class GridServiceProcessor extends GridProcessorAdapter { "different configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']')); } else { - for (Cache.Entry<Object, Object> e : cache.entrySetx()) { + Iterator<Cache.Entry<Object, Object>> it = serviceEntries( + ServiceAssignmentsPredicate.INSTANCE); + + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); + if (e.getKey() instanceof GridServiceAssignmentsKey) { GridServiceAssignments assigns = (GridServiceAssignments)e.getValue(); @@ -437,7 +445,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { public IgniteInternalFuture<?> cancelAll() { Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(); - for (Cache.Entry<Object, Object> e : cache.entrySetx()) { + Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE); + + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); + if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue; @@ -456,7 +468,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { public Collection<ServiceDescriptor> serviceDescriptors() { Collection<ServiceDescriptor> descs = new ArrayList<>(); - for (Cache.Entry<Object, Object> e : cache.entrySetx()) { + Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE); + + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); + if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue; @@ -904,6 +920,43 @@ public class GridServiceProcessor extends GridProcessorAdapter { } /** + * @param p Entry predicate used to execute query from client node. + * @return Service deployment entries. + */ + @SuppressWarnings("unchecked") + private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) { + if (!cache.context().affinityNode()) { + ClusterNode oldestSrvNode = + CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); + + if (oldestSrvNode == null) + return F.emptyIterator(); + + GridCacheQueryManager qryMgr = cache.context().queries(); + + CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, false); + + qry.keepAll(false); + + qry.projection(ctx.cluster().get().forNode(oldestSrvNode)); + + return cache.context().itHolder().iterator(qry.execute(), + new CacheIteratorConverter<Object, Map.Entry<Object,Object>>() { + @Override protected Object convert(Map.Entry<Object, Object> e) { + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected void remove(Object item) { + throw new UnsupportedOperationException(); + } + } + ); + } + else + return cache.entrySetx().iterator(); + } + + /** * Service deployment listener. */ private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> { @@ -1045,18 +1098,24 @@ public class GridServiceProcessor extends GridProcessorAdapter { try { depExe.submit(new BusyRunnable() { @Override public void run0() { - long topVer = ((DiscoveryEvent)evt).topologyVersion(); + AffinityTopologyVersion topVer = + new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion()); - ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer); - if (oldest.isLocal()) { + if (oldest != null && oldest.isLocal()) { final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>(); if (ctx.deploy().enabled()) ctx.cache().context().deploy().ignoreOwnership(true); try { - for (Cache.Entry<Object, Object> e : cache.entrySetx()) { + Iterator<Cache.Entry<Object, Object>> it = serviceEntries( + ServiceDeploymentPredicate.INSTANCE); + + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); + if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue; @@ -1068,7 +1127,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity(). affinityReadyFuture(topVer).get(); - reassign(dep, topVer); + reassign(dep, topVer.topologyVersion()); } catch (IgniteCheckedException ex) { if (!(e instanceof ClusterTopologyCheckedException)) @@ -1085,7 +1144,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { } if (!retries.isEmpty()) - onReassignmentFailed(topVer, retries); + onReassignmentFailed(topVer.topologyVersion(), retries); } // Clean up zombie assignments. @@ -1265,4 +1324,46 @@ public class GridServiceProcessor extends GridProcessorAdapter { */ public abstract void run0(); } + + /** + * + */ + static class ServiceDeploymentPredicate implements IgniteBiPredicate<Object, Object> { + /** */ + static final ServiceDeploymentPredicate INSTANCE = new ServiceDeploymentPredicate(); + + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key instanceof GridServiceDeploymentKey; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceDeploymentPredicate.class, this); + } + } + + /** + * + */ + static class ServiceAssignmentsPredicate implements IgniteBiPredicate<Object, Object> { + /** */ + static final ServiceAssignmentsPredicate INSTANCE = new ServiceAssignmentsPredicate(); + + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key instanceof GridServiceAssignmentsKey; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceAssignmentsPredicate.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java new file mode 100644 index 0000000..a0fd9b4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.timeout; + +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; + +/** + * Wrapper for {@link IgniteSpiTimeoutObject}. + */ +public class GridSpiTimeoutObject implements GridTimeoutObject { + /** */ + @GridToStringInclude + private final IgniteSpiTimeoutObject obj; + + /** + * @param obj SPI object. + */ + public GridSpiTimeoutObject(IgniteSpiTimeoutObject obj) { + this.obj = obj; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + obj.onTimeout(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return obj.id(); + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return obj.endTime(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + assert false; + + return super.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + assert false; + + return super.equals(obj); + } + + /** {@inheritDoc} */ + @Override public final String toString() { + return S.toString(GridSpiTimeoutObject.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java index 81ff72b..e4f370c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java @@ -21,11 +21,14 @@ import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.lang.*; import org.apache.ignite.thread.*; +import java.io.*; import java.util.*; /** @@ -40,10 +43,12 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { new GridConcurrentSkipListSet<>(new Comparator<GridTimeoutObject>() { /** {@inheritDoc} */ @Override public int compare(GridTimeoutObject o1, GridTimeoutObject o2) { - long time1 = o1.endTime(); - long time2 = o2.endTime(); + int res = Long.compare(o1.endTime(), o2.endTime()); - return time1 < time2 ? -1 : time1 > time2 ? 1 : o1.timeoutId().compareTo(o2.timeoutId()); + if (res != 0) + return res; + + return o1.timeoutId().compareTo(o2.timeoutId()); } }); @@ -98,6 +103,26 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { } /** + * Schedule the specified timer task for execution at the specified + * time with the specified period, in milliseconds. + * + * @param task Task to execute. + * @param delay Delay to first execution in milliseconds. + * @param period Period for execution in milliseconds or -1. + * @return Cancelable to cancel task. + */ + public CancelableTask schedule(Runnable task, long delay, long period) { + assert delay >= 0 : delay; + assert period > 0 || period == -1 : period; + + CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period); + + addTimeoutObject(obj); + + return obj; + } + + /** * @param timeoutObj Timeout object. */ public void removeTimeoutObject(GridTimeoutObject timeoutObj) { @@ -173,4 +198,78 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { X.println(">>> Timeout processor memory stats [grid=" + ctx.gridName() + ']'); X.println(">>> timeoutObjsSize: " + timeoutObjs.size()); } + + /** + * + */ + public class CancelableTask implements GridTimeoutObject, Closeable { + /** */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + + /** */ + private long endTime; + + /** */ + private final long period; + + /** */ + private volatile boolean cancel; + + /** */ + @GridToStringInclude + private final Runnable task; + + /** + * @param task Task to execute. + * @param firstTime First time. + * @param period Period. + */ + CancelableTask(Runnable task, long firstTime, long period) { + this.task = task; + endTime = firstTime; + this.period = period; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return id; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public synchronized void onTimeout() { + if (cancel) + return; + + try { + task.run(); + } + finally { + if (!cancel && period > 0) { + endTime = U.currentTimeMillis() + period; + + addTimeoutObject(this); + } + } + } + + /** {@inheritDoc} */ + @Override public void close() { + cancel = true; + + synchronized (this) { + // Just waiting for task execution end to make sure that task will not be executed anymore. + removeTimeoutObject(this); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CancelableTask.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java index 1d1e022..f8ee265 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java @@ -36,6 +36,15 @@ public class IgniteTxRollbackCheckedException extends IgniteCheckedException { } /** + * Creates new exception with given nested exception. + * + * @param cause Nested exception. + */ + public IgniteTxRollbackCheckedException(Throwable cause) { + super(cause); + } + + /** * Creates new rollback exception with given error message and optional nested exception. * * @param msg Error message. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java index bff26ec..42fe089 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java @@ -128,25 +128,31 @@ public final class GridJavaProcess { gjProc.log = log; gjProc.procKilledC = procKilledC; - String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java"; - String classpath = System.getProperty("java.class.path"); - String sfcp = System.getProperty("surefire.test.class.path"); - - if (sfcp != null) - classpath += System.getProperty("path.separator") + sfcp; - - if (cp != null) - classpath += System.getProperty("path.separator") + cp; - List<String> procParams = params == null || params.isEmpty() ? Collections.<String>emptyList() : Arrays.asList(params.split(" ")); List<String> procCommands = new ArrayList<>(); + String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java"; + procCommands.add(javaBin); procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs); - procCommands.add("-cp"); - procCommands.add(classpath); + + if (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath")) { + String classpath = System.getProperty("java.class.path"); + + String sfcp = System.getProperty("surefire.test.class.path"); + + if (sfcp != null) + classpath += System.getProperty("path.separator") + sfcp; + + if (cp != null) + classpath += System.getProperty("path.separator") + cp; + + procCommands.add("-cp"); + procCommands.add(classpath); + } + procCommands.add(clsName); procCommands.addAll(procParams); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index fb9ad29..f8caf22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -241,8 +241,8 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements lsnr.apply(this); } catch (IllegalStateException e) { - U.warn(null, "Failed to notify listener (is grid stopped?) [fut=" + this + - ", lsnr=" + lsnr + ", err=" + e.getMessage() + ']'); + U.error(null, "Failed to notify listener (is grid stopped?) [fut=" + this + + ", lsnr=" + lsnr + ", err=" + e.getMessage() + ']', e); } catch (RuntimeException | Error e) { U.error(null, "Failed to notify listener: " + lsnr, e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 31396fb..693a5a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -38,58 +38,58 @@ public interface GridCommunicationClient { * @param handshakeC Handshake. * @throws IgniteCheckedException If handshake failed. */ - void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException; + public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException; /** * @return {@code True} if client has been closed by this call, * {@code false} if failed to close client (due to concurrent reservation or concurrent close). */ - boolean close(); + public boolean close(); /** * Forces client close. */ - void forceClose(); + public void forceClose(); /** * @return {@code True} if client is closed; */ - boolean closed(); + public boolean closed(); /** * @return {@code True} if client was reserved, {@code false} otherwise. */ - boolean reserve(); + public boolean reserve(); /** * Releases this client by decreasing reservations. */ - void release(); + public void release(); /** * @return {@code True} if client was reserved. */ - boolean reserved(); + public boolean reserved(); /** * Gets idle time of this client. * * @return Idle time of this client. */ - long getIdleTime(); + public long getIdleTime(); /** * @param data Data to send. * @throws IgniteCheckedException If failed. */ - void sendMessage(ByteBuffer data) throws IgniteCheckedException; + public void sendMessage(ByteBuffer data) throws IgniteCheckedException; /** * @param data Data to send. * @param len Length. * @throws IgniteCheckedException If failed. */ - void sendMessage(byte[] data, int len) throws IgniteCheckedException; + public void sendMessage(byte[] data, int len) throws IgniteCheckedException; /** * @param nodeId Node ID (provided only if versions of local and remote nodes are different). @@ -97,16 +97,10 @@ public interface GridCommunicationClient { * @throws IgniteCheckedException If failed. * @return {@code True} if should try to resend message. */ - boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException; - - /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - void flushIfNeeded(long timeout) throws IOException; + public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException; /** * @return {@code True} if send is asynchronous. */ - boolean async(); + public boolean async(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java index 2b764ec..44ab4a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java @@ -85,7 +85,7 @@ public class GridNioDelimitedBuffer { idx++; } else { - pos = cnt - idx; + pos = cnt - (i - pos) - 1; idx = 0; }