http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerManager.java
deleted file mode 100644
index 5ca06f5..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerManager.java
+++ /dev/null
@@ -1,829 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.ggfs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-import static 
org.gridgain.grid.kernal.processors.ggfs.GridGgfsFileAffinityRange.*;
-
-/**
- * GGFS fragmentizer manager.
- */
-public class GridGgfsFragmentizerManager extends GridGgfsManager {
-    /** Message offer wait interval. */
-    private static final int MSG_OFFER_TIMEOUT = 1000;
-
-    /** Fragmentizer files check interval. */
-    private static final int FRAGMENTIZER_CHECK_INTERVAL = 3000;
-
-    /** Message send retry interval. */
-    private static final int MESSAGE_SEND_RETRY_INTERVAL = 1000;
-
-    /** How many times retry message send. */
-    private static final int MESSAGE_SEND_RETRY_COUNT = 3;
-
-    /** Manager stopping flag. */
-    private volatile boolean stopping;
-
-    /** Coordinator worker. */
-    private volatile FragmentizerCoordinator fragmentizerCrd;
-
-    /** This variable is used in tests only. */
-    @SuppressWarnings("FieldCanBeLocal")
-    private volatile boolean fragmentizerEnabled = true;
-
-    /** Fragmentizer worker. */
-    private FragmentizerWorker fragmentizerWorker;
-
-    /** Shutdown lock. */
-    private GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
-
-    /** Message topic. */
-    private Object topic;
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        if (!ggfsCtx.configuration().isFragmentizerEnabled())
-            return;
-
-        // We care only about node leave and fail events.
-        ggfsCtx.kernalContext().event().addLocalEventListener(new 
GridLocalEventListener() {
-            @Override public void onEvent(IgniteEvent evt) {
-                assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
-
-                IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
-
-                checkLaunchCoordinator(discoEvt);
-            }
-        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
-        fragmentizerWorker = new FragmentizerWorker();
-
-        String ggfsName = ggfsCtx.configuration().getName();
-
-        topic = F.isEmpty(ggfsName) ? TOPIC_GGFS : TOPIC_GGFS.topic(ggfsName);
-
-        ggfsCtx.kernalContext().io().addMessageListener(topic, 
fragmentizerWorker);
-
-        new IgniteThread(fragmentizerWorker).start();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onKernalStart0() throws IgniteCheckedException {
-        if (ggfsCtx.configuration().isFragmentizerEnabled()) {
-            // Check at startup if this node is a fragmentizer coordinator.
-            IgniteDiscoveryEvent locJoinEvt = 
ggfsCtx.kernalContext().discovery().localJoinEvent();
-
-            checkLaunchCoordinator(locJoinEvt);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("BusyWait")
-    @Override protected void onKernalStop0(boolean cancel) {
-        boolean interrupted = false;
-
-        // Busy wait is intentional.
-        while (true) {
-            try {
-                if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
-                    break;
-                else
-                    Thread.sleep(200);
-            }
-            catch (InterruptedException ignore) {
-                // Preserve interrupt status & ignore.
-                // Note that interrupted flag is cleared.
-                interrupted = true;
-            }
-        }
-
-        try {
-            if (interrupted)
-                Thread.currentThread().interrupt();
-
-            stopping = true;
-        }
-        finally {
-            rw.writeUnlock();
-        }
-
-        synchronized (this) {
-            if (fragmentizerCrd != null)
-                fragmentizerCrd.cancel();
-        }
-
-        if (fragmentizerWorker != null)
-            fragmentizerWorker.cancel();
-
-        U.join(fragmentizerCrd, log);
-        U.join(fragmentizerWorker, log);
-    }
-
-    /**
-     * @param nodeId Node ID to send message to.
-     * @param msg Message to send.
-     * @throws IgniteCheckedException If send failed.
-     */
-    private void sendWithRetries(UUID nodeId, GridGgfsCommunicationMessage 
msg) throws IgniteCheckedException {
-        for (int i = 0; i < MESSAGE_SEND_RETRY_COUNT; i++) {
-            try {
-                ggfsCtx.send(nodeId, topic, msg, SYSTEM_POOL);
-
-                return;
-            }
-            catch (IgniteCheckedException e) {
-                if (!ggfsCtx.kernalContext().discovery().alive(nodeId))
-                    throw new ClusterTopologyException("Failed to send message 
(node left the grid) " +
-                        "[nodeId=" + nodeId + ", msg=" + msg + ']');
-
-                if (i == MESSAGE_SEND_RETRY_COUNT - 1)
-                    throw e;
-
-                U.sleep(MESSAGE_SEND_RETRY_INTERVAL);
-            }
-        }
-    }
-
-    /**
-     * Checks if current node is the oldest node in topology and starts 
coordinator thread if so.
-     * Note that once node is the oldest one, it will be the oldest until it 
leaves grid.
-     *
-     * @param discoEvt Discovery event.
-     */
-    private void checkLaunchCoordinator(IgniteDiscoveryEvent discoEvt) {
-        rw.readLock();
-
-        try {
-            if (stopping)
-                return;
-
-            if (fragmentizerCrd == null) {
-                long minNodeOrder = Long.MAX_VALUE;
-
-                Collection<ClusterNode> nodes = discoEvt.topologyNodes();
-
-                for (ClusterNode node : nodes) {
-                    if (node.order() < minNodeOrder && ggfsCtx.ggfsNode(node))
-                        minNodeOrder = node.order();
-                }
-
-                ClusterNode locNode = 
ggfsCtx.kernalContext().grid().localNode();
-
-                if (locNode.order() == minNodeOrder) {
-                    if (log.isDebugEnabled())
-                        log.debug("Detected local node to be the eldest GGFS 
node in topology, starting fragmentizer " +
-                            "coordinator thread [discoEvt=" + discoEvt + ", 
locNode=" + locNode + ']');
-
-                    synchronized (this) {
-                        if (fragmentizerCrd == null && !stopping) {
-                            fragmentizerCrd = new FragmentizerCoordinator();
-
-                            new IgniteThread(fragmentizerCrd).start();
-                        }
-                    }
-                }
-            }
-        }
-        finally {
-            rw.readUnlock();
-        }
-    }
-
-    /**
-     * Processes fragmentizer request. For each range assigned to this node:
-     * <ul>
-     *     <li>Mark range as moving indicating that block copying started.</li>
-     *     <li>Copy blocks to non-colocated keys.</li>
-     *     <li>Update map to indicate that blocks were copied and old blocks 
should be deleted.</li>
-     *     <li>Delete old blocks.</li>
-     *     <li>Remove range from file map.</li>
-     * </ul>
-     *
-     * @param req Request.
-     * @throws IgniteCheckedException In case of error.
-     */
-    @SuppressWarnings("fallthrough")
-    private void processFragmentizerRequest(GridGgfsFragmentizerRequest req) 
throws IgniteCheckedException {
-        req.finishUnmarshal(ggfsCtx.kernalContext().config().getMarshaller(), 
null);
-
-        Collection<GridGgfsFileAffinityRange> ranges = req.fragmentRanges();
-        IgniteUuid fileId = req.fileId();
-
-        GridGgfsFileInfo fileInfo = ggfsCtx.meta().info(fileId);
-
-        if (fileInfo == null) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to find file info for fragmentizer request: 
" + req);
-
-            return;
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Moving file ranges for fragmentizer request [req=" + 
req + ", fileInfo=" + fileInfo + ']');
-
-        for (GridGgfsFileAffinityRange range : ranges) {
-            try {
-                GridGgfsFileInfo updated;
-
-                switch (range.status()) {
-                    case RANGE_STATUS_INITIAL: {
-                        // Mark range as moving.
-                        updated = ggfsCtx.meta().updateInfo(fileId, 
updateRange(range, RANGE_STATUS_MOVING));
-
-                        if (updated == null) {
-                            ggfsCtx.data().cleanBlocks(fileInfo, range, true);
-
-                            continue;
-                        }
-
-                        // Fall-through.
-                    }
-
-                    case RANGE_STATUS_MOVING: {
-                        // Move colocated blocks.
-                        ggfsCtx.data().spreadBlocks(fileInfo, range);
-
-                        // Mark range as moved.
-                        updated = ggfsCtx.meta().updateInfo(fileId, 
updateRange(range, RANGE_STATUS_MOVED));
-
-                        if (updated == null) {
-                            ggfsCtx.data().cleanBlocks(fileInfo, range, true);
-
-                            continue;
-                        }
-
-                        // Fall-through.
-                    }
-
-                    case RANGE_STATUS_MOVED: {
-                        // Remove old blocks.
-                        ggfsCtx.data().cleanBlocks(fileInfo, range, false);
-
-                        // Remove range from map.
-                        updated = ggfsCtx.meta().updateInfo(fileId, 
deleteRange(range));
-
-                        if (updated == null)
-                            ggfsCtx.data().cleanBlocks(fileInfo, range, true);
-                    }
-                }
-            }
-            catch (GridGgfsInvalidRangeException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to update file range " +
-                        "[range=" + range + "fileId=" + fileId + ", err=" + 
e.getMessage() + ']');
-            }
-        }
-    }
-
-    /**
-     * Creates update info closure that will mark given range as moving.
-     *
-     * @param range Range to mark as moving.
-     * @param status Status.
-     * @return Update closure.
-     */
-    private IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo> 
updateRange(final GridGgfsFileAffinityRange range,
-        final int status) {
-        return new CX1<GridGgfsFileInfo, GridGgfsFileInfo>() {
-            @Override public GridGgfsFileInfo applyx(GridGgfsFileInfo info) 
throws IgniteCheckedException {
-                GridGgfsFileMap map = new GridGgfsFileMap(info.fileMap());
-
-                map.updateRangeStatus(range, status);
-
-                if (log.isDebugEnabled())
-                    log.debug("Updated file map for range [fileId=" + 
info.id() + ", range=" + range +
-                        ", status=" + status + ", oldMap=" + info.fileMap() + 
", newMap=" + map + ']');
-
-                GridGgfsFileInfo updated = new GridGgfsFileInfo(info, 
info.length());
-
-                updated.fileMap(map);
-
-                return updated;
-            }
-        };
-    }
-
-    /**
-     * Creates update info closure that will mark given range as moving.
-     *
-     * @param range Range to mark as moving.
-     * @return Update closure.
-     */
-    private IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo> 
deleteRange(final GridGgfsFileAffinityRange range) {
-        return new CX1<GridGgfsFileInfo, GridGgfsFileInfo>() {
-            @Override public GridGgfsFileInfo applyx(GridGgfsFileInfo info) 
throws IgniteCheckedException {
-                GridGgfsFileMap map = new GridGgfsFileMap(info.fileMap());
-
-                map.deleteRange(range);
-
-                if (log.isDebugEnabled())
-                    log.debug("Deleted range from file map [fileId=" + 
info.id() + ", range=" + range +
-                        ", oldMap=" + info.fileMap() + ", newMap=" + map + 
']');
-
-                GridGgfsFileInfo updated = new GridGgfsFileInfo(info, 
info.length());
-
-                updated.fileMap(map);
-
-                return updated;
-            }
-        };
-    }
-
-    /**
-     * Fragmentizer coordinator thread.
-     */
-    private class FragmentizerCoordinator extends GridWorker implements 
GridLocalEventListener, GridMessageListener {
-        /** Files being fragmented. */
-        private ConcurrentMap<IgniteUuid, Collection<UUID>> fragmentingFiles = 
new ConcurrentHashMap<>();
-
-        /** Node IDs captured on start. */
-        private volatile Collection<UUID> startSync;
-
-        /** Wait lock. */
-        private Lock lock = new ReentrantLock();
-
-        /** Wait condition. */
-        private Condition cond = lock.newCondition();
-
-        /**
-         * Constructor.
-         */
-        protected FragmentizerCoordinator() {
-            super(ggfsCtx.kernalContext().gridName(), 
"fragmentizer-coordinator", ggfsCtx.kernalContext().log());
-
-            ggfsCtx.kernalContext().event().addLocalEventListener(this, 
EVT_NODE_LEFT, EVT_NODE_FAILED);
-            ggfsCtx.kernalContext().io().addMessageListener(topic, this);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedException {
-            // Wait for all previous fragmentizer tasks to complete.
-            syncStart();
-
-            while (!isCancelled()) {
-                // If we have room for files, add them to fragmentizer.
-                try {
-                    while (fragmentingFiles.size() < 
ggfsCtx.configuration().getFragmentizerConcurrentFiles()) {
-                        GridGgfsFileInfo fileInfo = 
fileForFragmentizer(fragmentingFiles.keySet());
-
-                        // If no colocated files found, exit loop.
-                        if (fileInfo == null)
-                            break;
-
-                        requestFragmenting(fileInfo);
-                    }
-                }
-                catch (IgniteCheckedException | IgniteException e) {
-                    if (!X.hasCause(e, InterruptedException.class) && 
!X.hasCause(e, IgniteInterruptedException.class))
-                        LT.warn(log, e, "Failed to get fragmentizer file info 
(will retry).");
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Got interrupted exception in 
fragmentizer coordinator (grid is stopping).");
-
-                        break; // While.
-                    }
-                }
-
-                lock.lock();
-
-                try {
-                    cond.await(FRAGMENTIZER_CHECK_INTERVAL, MILLISECONDS);
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onEvent(IgniteEvent evt) {
-            assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
-
-            IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
-
-            if (log.isDebugEnabled())
-                log.debug("Processing node leave event: " + discoEvt);
-
-            boolean signal = false;
-
-            Collection<UUID> startSync0 = startSync;
-
-            if (startSync0 != null && !startSync0.isEmpty()) {
-                startSync0.remove(discoEvt.eventNode().id());
-
-                if (startSync0.isEmpty()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Completed fragmentizer coordinator sync 
start.");
-
-                    signal = true;
-                }
-            }
-
-            if (!signal) {
-                Iterator<Map.Entry<IgniteUuid, Collection<UUID>>> it = 
fragmentingFiles.entrySet().iterator();
-
-                while (it.hasNext()) {
-                    Map.Entry<IgniteUuid, Collection<UUID>> entry = it.next();
-
-                    Collection<UUID> nodeIds = entry.getValue();
-
-                    if (nodeIds.remove(discoEvt.eventNode().id())) {
-                        if (nodeIds.isEmpty()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Received all responses for 
fragmentizer task [fileId=" + entry.getKey() +
-                                    ']');
-
-                            it.remove();
-
-                            signal = true;
-                        }
-                    }
-                }
-            }
-
-            if (signal)
-                wakeUp();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
-            if (msg instanceof GridGgfsFragmentizerResponse) {
-                GridGgfsFragmentizerResponse res = 
(GridGgfsFragmentizerResponse)msg;
-
-                IgniteUuid fileId = res.fileId();
-
-                Collection<UUID> nodeIds = fragmentingFiles.get(fileId);
-
-                if (nodeIds != null) {
-                    if (nodeIds.remove(nodeId)) {
-                        if (nodeIds.isEmpty()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Received all responses for 
fragmentizer task [fileId=" + fileId + ']');
-
-                            fragmentingFiles.remove(fileId, nodeIds);
-
-                            wakeUp();
-                        }
-                    }
-                }
-                else
-                    log.warning("Received fragmentizer response for file ID 
which was not requested (will ignore) " +
-                        "[nodeId=" + nodeId + ", fileId=" + res.fileId() + 
']');
-            }
-            else if (msg instanceof GridGgfsSyncMessage) {
-                GridGgfsSyncMessage sync = (GridGgfsSyncMessage)msg;
-
-                if (sync.response() && sync.order() == 
ggfsCtx.kernalContext().grid().localNode().order()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received fragmentizer sync response from 
remote node: " + nodeId);
-
-                    Collection<UUID> startSync0 = startSync;
-
-                    if (startSync0 != null) {
-                        startSync0.remove(nodeId);
-
-                        if (startSync0.isEmpty()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Completed fragmentizer coordinator 
sync start: " + startSync0);
-
-                            wakeUp();
-                        }
-                    }
-                }
-            }
-        }
-
-        /**
-         * Signals condition.
-         */
-        private void wakeUp() {
-            lock.lock();
-
-            try {
-                cond.signalAll();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * Sends sync message to remote nodes and awaits for response from all 
nodes.
-         *
-         * @throws InterruptedException If waiting was interrupted.
-         */
-        private void syncStart() throws InterruptedException {
-            Collection<UUID> startSync0 = startSync = new 
GridConcurrentHashSet<>(
-                F.viewReadOnly(
-                    ggfsCtx.kernalContext().discovery().allNodes(),
-                    F.node2id(),
-                    new P1<ClusterNode>() {
-                        @Override public boolean apply(ClusterNode n) {
-                            return ggfsCtx.ggfsNode(n);
-                        }
-                    }));
-
-            ClusterNode locNode = ggfsCtx.kernalContext().grid().localNode();
-
-            while (!startSync0.isEmpty()) {
-                for (UUID nodeId : startSync0) {
-                    GridGgfsSyncMessage syncReq = new 
GridGgfsSyncMessage(locNode.order(), false);
-
-                    try {
-                        if (log.isDebugEnabled())
-                            log.debug("Sending fragmentizer sync start request 
to remote node [nodeId=" + nodeId +
-                                ", syncReq=" + syncReq + ']');
-
-                        sendWithRetries(nodeId, syncReq);
-
-                        // Close window between message sending and discovery 
event.
-                        if (!ggfsCtx.kernalContext().discovery().alive(nodeId))
-                            startSync0.remove(nodeId);
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (e.hasCause(ClusterTopologyException.class)) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to send sync message to 
remote node (node has left the grid): " +
-                                    nodeId);
-                        }
-                        else
-                            U.error(log, "Failed to send synchronize message 
to remote node (will not wait for reply): " +
-                                nodeId, e);
-
-                        startSync0.remove(nodeId);
-                    }
-                }
-
-                lock.lock();
-
-                try {
-                    if (!startSync0.isEmpty())
-                        cond.await(10000, MILLISECONDS);
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-
-        /**
-         * Starts file fragmenting. Will group file affinity ranges by nodes 
and send requests to each node.
-         * File will be considered processed when each node replied with 
success (or error) or left the grid.
-         *
-         * @param fileInfo File info to process.
-         */
-        private void requestFragmenting(GridGgfsFileInfo fileInfo) {
-            GridGgfsFileMap map = fileInfo.fileMap();
-
-            assert map != null && !map.ranges().isEmpty();
-
-            Map<UUID, Collection<GridGgfsFileAffinityRange>> grpMap = 
U.newHashMap(map.ranges().size());
-
-            for (GridGgfsFileAffinityRange range : map.ranges()) {
-                UUID nodeId = 
ggfsCtx.data().affinityNode(range.affinityKey()).id();
-
-                Collection<GridGgfsFileAffinityRange> nodeRanges = 
grpMap.get(nodeId);
-
-                if (nodeRanges == null) {
-                    nodeRanges = new LinkedList<>();
-
-                    grpMap.put(nodeId, nodeRanges);
-                }
-
-                
nodeRanges.addAll(range.split(ggfsCtx.data().groupBlockSize()));
-            }
-
-            Collection<UUID> nodeIds = new IdentityHashSet(grpMap.keySet());
-
-            if (log.isDebugEnabled())
-                log.debug("Calculating fragmentizer groups for file 
[fileInfo=" + fileInfo +
-                    ", nodeIds=" + nodeIds + ']');
-
-            // Put assignment to map first.
-            Object old = fragmentingFiles.putIfAbsent(fileInfo.id(), nodeIds);
-
-            assert old == null;
-
-            for (Map.Entry<UUID, Collection<GridGgfsFileAffinityRange>> entry 
: grpMap.entrySet()) {
-                UUID nodeId = entry.getKey();
-
-                GridGgfsFragmentizerRequest msg = new 
GridGgfsFragmentizerRequest(fileInfo.id(), entry.getValue());
-
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Sending fragmentizer request to remote node 
[nodeId=" + nodeId +
-                            ", fileId=" + fileInfo.id() + ", msg=" + msg + 
']');
-
-                    sendWithRetries(nodeId, msg);
-                }
-                catch (IgniteCheckedException e) {
-                    if (e.hasCause(ClusterTopologyException.class)) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to send fragmentizer request to 
remote node (node left grid): " +
-                                nodeId);
-                    }
-                    else
-                        U.error(log, "Failed to send fragmentizer request to 
remote node [nodeId=" + nodeId +
-                            ", msg=" + msg + ']', e);
-
-                    nodeIds.remove(nodeId);
-                }
-            }
-
-            if (nodeIds.isEmpty()) {
-                if (log.isDebugEnabled())
-                    log.debug("Got empty wait set for fragmentized file: " + 
fileInfo);
-
-                fragmentingFiles.remove(fileInfo.id(), nodeIds);
-            }
-        }
-    }
-
-    /**
-     * Gets next file for fragmentizer to be processed.
-     *
-     * @param exclude File IDs to exclude (the ones that are currently being 
processed).
-     * @return File ID to process or {@code null} if there are no such files.
-     * @throws IgniteCheckedException In case of error.
-     */
-    @Nullable private GridGgfsFileInfo 
fileForFragmentizer(Collection<IgniteUuid> exclude) throws 
IgniteCheckedException {
-        return fragmentizerEnabled ? 
ggfsCtx.meta().fileForFragmentizer(exclude) : null;
-    }
-
-    /**
-     * Fragmentizer worker thread.
-     */
-    private class FragmentizerWorker extends GridWorker implements 
GridMessageListener {
-        /** Requests for this worker. */
-        private BlockingQueue<IgniteBiTuple<UUID, 
GridGgfsCommunicationMessage>> msgs = new LinkedBlockingDeque<>();
-
-        /**
-         * Constructor.
-         */
-        protected FragmentizerWorker() {
-            super(ggfsCtx.kernalContext().gridName(), "fragmentizer-worker", 
ggfsCtx.kernalContext().log());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
-            if (msg instanceof GridGgfsFragmentizerRequest ||
-                msg instanceof GridGgfsSyncMessage) {
-                if (log.isDebugEnabled())
-                    log.debug("Received fragmentizer request from remote node 
[nodeId=" + nodeId +
-                        ", msg=" + msg + ']');
-
-                IgniteBiTuple<UUID, GridGgfsCommunicationMessage> tup = 
F.t(nodeId, (GridGgfsCommunicationMessage)msg);
-
-                try {
-                    if (!msgs.offer(tup, MSG_OFFER_TIMEOUT, 
TimeUnit.MILLISECONDS)) {
-                        U.error(log, "Failed to process fragmentizer 
communication message (will discard) " +
-                            "[nodeId=" + nodeId + ", msg=" + msg + ']');
-                    }
-                }
-                catch (InterruptedException ignored) {
-                    Thread.currentThread().interrupt();
-
-                    U.warn(log, "Failed to process fragmentizer communication 
message (thread was interrupted) "+
-                        "[nodeId=" + nodeId + ", msg=" + msg + ']');
-                }
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedException {
-            while (!isCancelled()) {
-                IgniteBiTuple<UUID, GridGgfsCommunicationMessage> req = 
msgs.take();
-
-                UUID nodeId = req.get1();
-
-                if (req.get2() instanceof GridGgfsFragmentizerRequest) {
-                    GridGgfsFragmentizerRequest fragmentizerReq = 
(GridGgfsFragmentizerRequest)req.get2();
-
-                    if (!rw.tryReadLock()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Received fragmentizing request while 
stopping grid (will ignore) " +
-                                "[nodeId=" + nodeId + ", req=" + req.get2() + 
']');
-
-                        continue; // while.
-                    }
-
-                    try {
-                        try {
-                            processFragmentizerRequest(fragmentizerReq);
-                        }
-                        catch (IgniteCheckedException e) {
-                            if (e.hasCause(ClusterTopologyException.class)) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to process fragmentizer 
request (remote node left the grid) " +
-                                        "[req=" + req + ", err=" + 
e.getMessage() + ']');
-                            }
-                            else
-                                U.error(log, "Failed to process fragmentizer 
request [nodeId=" + nodeId +
-                                    ", req=" + req + ']', e);
-                        }
-                        finally {
-                            sendResponse(nodeId, new 
GridGgfsFragmentizerResponse(fragmentizerReq.fileId()));
-                        }
-                    }
-                    finally {
-                        rw.readUnlock();
-                    }
-                }
-                else {
-                    assert req.get2() instanceof GridGgfsSyncMessage;
-
-                    GridGgfsSyncMessage syncMsg = 
(GridGgfsSyncMessage)req.get2();
-
-                    if (!syncMsg.response()) {
-                        GridGgfsSyncMessage res = new 
GridGgfsSyncMessage(syncMsg.order(), true);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Sending fragmentizer sync response to 
remote node [nodeId=" + nodeId +
-                                ", res=" + res + ']');
-
-                        sendResponse(nodeId, res);
-                    }
-                }
-            }
-        }
-
-        /**
-         * Sends response to remote node.
-         *
-         * @param nodeId Node ID to send response to.
-         * @param msg Message to send.
-         */
-        private void sendResponse(UUID nodeId, GridGgfsCommunicationMessage 
msg) {
-            try {
-                sendWithRetries(nodeId, msg);
-            }
-            catch (IgniteCheckedException e) {
-                if (e.hasCause(ClusterTopologyException.class)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send sync response to GGFS 
fragmentizer coordinator " +
-                            "(originating node left the grid): " + nodeId);
-                }
-                else
-                    U.error(log, "Failed to send sync response to GGFS 
fragmentizer coordinator: " + nodeId, e);
-            }
-        }
-    }
-
-    /**
-     * Hash set that overrides equals to use identity comparison.
-     */
-    private static class IdentityHashSet extends GridConcurrentHashSet<UUID> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * Constructor.
-         *
-         * @param c Collection to add.
-         */
-        private IdentityHashSet(Collection<UUID> c) {
-            super(c);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            // Identity comparison.
-            return this == o;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerRequest.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerRequest.java
deleted file mode 100644
index 0a81927..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerRequest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.ggfs;
-
-import org.apache.ignite.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.tostring.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
-
-/**
- * Fragmentizer request. Sent from coordinator to other GGFS nodes when 
colocated part of file
- * should be fragmented.
- */
-public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** File id. */
-    private IgniteUuid fileId;
-
-    /** Ranges to fragment. */
-    @GridToStringInclude
-    @GridDirectCollection(GridGgfsFileAffinityRange.class)
-    private Collection<GridGgfsFileAffinityRange> fragmentRanges;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridGgfsFragmentizerRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param fileId File id to fragment.
-     * @param fragmentRanges Ranges to fragment.
-     */
-    public GridGgfsFragmentizerRequest(IgniteUuid fileId, 
Collection<GridGgfsFileAffinityRange> fragmentRanges) {
-        this.fileId = fileId;
-        this.fragmentRanges = fragmentRanges;
-    }
-
-    /**
-     * @return File ID.
-     */
-    public IgniteUuid fileId() {
-        return fileId;
-    }
-
-    /**
-     * @return Fragment ranges.
-     */
-    public Collection<GridGgfsFileAffinityRange> fragmentRanges() {
-        return fragmentRanges;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridGgfsFragmentizerRequest.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridGgfsFragmentizerRequest _clone = new GridGgfsFragmentizerRequest();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridGgfsFragmentizerRequest _clone = (GridGgfsFragmentizerRequest)_msg;
-
-        _clone.fileId = fileId;
-        _clone.fragmentRanges = fragmentRanges;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 0:
-                if (!commState.putGridUuid(fileId))
-                    return false;
-
-                commState.idx++;
-
-            case 1:
-                if (fragmentRanges != null) {
-                    if (commState.it == null) {
-                        if (!commState.putInt(fragmentRanges.size()))
-                            return false;
-
-                        commState.it = fragmentRanges.iterator();
-                    }
-
-                    while (commState.it.hasNext() || commState.cur != NULL) {
-                        if (commState.cur == NULL)
-                            commState.cur = commState.it.next();
-
-                        if 
(!commState.putMessage((GridGgfsFileAffinityRange)commState.cur))
-                            return false;
-
-                        commState.cur = NULL;
-                    }
-
-                    commState.it = null;
-                } else {
-                    if (!commState.putInt(-1))
-                        return false;
-                }
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 0:
-                IgniteUuid fileId0 = commState.getGridUuid();
-
-                if (fileId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                fileId = fileId0;
-
-                commState.idx++;
-
-            case 1:
-                if (commState.readSize == -1) {
-                    if (buf.remaining() < 4)
-                        return false;
-
-                    commState.readSize = commState.getInt();
-                }
-
-                if (commState.readSize >= 0) {
-                    if (fragmentRanges == null)
-                        fragmentRanges = new ArrayList<>(commState.readSize);
-
-                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
-                        Object _val = commState.getMessage();
-
-                        if (_val == MSG_NOT_READ)
-                            return false;
-
-                        fragmentRanges.add((GridGgfsFileAffinityRange)_val);
-
-                        commState.readItems++;
-                    }
-                }
-
-                commState.readSize = -1;
-                commState.readItems = 0;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 70;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerResponse.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerResponse.java
deleted file mode 100644
index 653c5b8..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerResponse.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.ggfs;
-
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.direct.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Fragmentizer response.
- */
-public class GridGgfsFragmentizerResponse extends GridGgfsCommunicationMessage 
{
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** File ID. */
-    private IgniteUuid fileId;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridGgfsFragmentizerResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param fileId File ID.
-     */
-    public GridGgfsFragmentizerResponse(IgniteUuid fileId) {
-        this.fileId = fileId;
-    }
-
-    /**
-     * @return File ID.
-     */
-    public IgniteUuid fileId() {
-        return fileId;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridGgfsFragmentizerResponse _clone = new 
GridGgfsFragmentizerResponse();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridGgfsFragmentizerResponse _clone = 
(GridGgfsFragmentizerResponse)_msg;
-
-        _clone.fileId = fileId;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 0:
-                if (!commState.putGridUuid(fileId))
-                    return false;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 0:
-                IgniteUuid fileId0 = commState.getGridUuid();
-
-                if (fileId0 == GRID_UUID_NOT_READ)
-                    return false;
-
-                fileId = fileId0;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 71;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHandshakeResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHandshakeResponse.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHandshakeResponse.java
deleted file mode 100644
index ea8a27f..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHandshakeResponse.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.ggfs;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Handshake message.
- */
-public class GridGgfsHandshakeResponse implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** GGFS name. */
-    private String ggfsName;
-
-    /** SECONDARY paths. */
-    private GridGgfsPaths paths;
-
-    /** Server block size. */
-    private long blockSize;
-
-    /** Whether to force sampling on client's side. */
-    private Boolean sampling;
-
-    /**
-     * {@link Externalizable} support.
-     */
-    public GridGgfsHandshakeResponse() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param paths Secondary paths.
-     * @param blockSize Server default block size.
-     */
-    public GridGgfsHandshakeResponse(String ggfsName, GridGgfsPaths paths, 
long blockSize, Boolean sampling) {
-        assert paths != null;
-
-        this.ggfsName = ggfsName;
-        this.paths = paths;
-        this.blockSize = blockSize;
-        this.sampling = sampling;
-    }
-
-    /**
-     * @return GGFS name.
-     */
-    public String ggfsName() {
-        return ggfsName;
-    }
-
-    /**
-     * @return SECONDARY paths configured on server.
-     */
-    public GridGgfsPaths secondaryPaths() {
-        return paths;
-    }
-
-    /**
-     * @return Server default block size.
-     */
-    public long blockSize() {
-        return blockSize;
-    }
-
-    /**
-     * @return Sampling flag.
-     */
-    public Boolean sampling() {
-        return sampling;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeString(out, ggfsName);
-
-        paths.writeExternal(out);
-
-        out.writeLong(blockSize);
-
-        if (sampling != null) {
-            out.writeBoolean(true);
-            out.writeBoolean(sampling);
-        }
-        else
-            out.writeBoolean(false);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        ggfsName = U.readString(in);
-
-        paths = new GridGgfsPaths();
-
-        paths.readExternal(in);
-
-        blockSize = in.readLong();
-
-        if (in.readBoolean())
-            sampling = in.readBoolean();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHelper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHelper.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHelper.java
deleted file mode 100644
index 3c41ba2..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHelper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.ggfs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-
-/**
- * GGFS utility processor adapter.
- */
-public interface GridGgfsHelper {
-    /**
-     * Pre-process cache configuration.
-     *
-     * @param cfg Cache configuration.
-     */
-    public abstract void preProcessCacheConfiguration(CacheConfiguration cfg);
-
-    /**
-     * Validate cache configuration for GGFS.
-     *
-     * @param cfg Cache configuration.
-     * @throws IgniteCheckedException If validation failed.
-     */
-    public abstract void validateCacheConfiguration(CacheConfiguration cfg) 
throws IgniteCheckedException;
-
-    /**
-     * Check whether object is of type {@code GridGgfsBlockKey}
-     *
-     * @param key Key.
-     * @return {@code True} if GGFS block key.
-     */
-    public abstract boolean isGgfsBlockKey(Object key);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHelperImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHelperImpl.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHelperImpl.java
deleted file mode 100644
index e343a73..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsHelperImpl.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.ggfs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.cache.eviction.ggfs.*;
-
-/**
- * GGFS utils processor.
- */
-public class GridGgfsHelperImpl implements GridGgfsHelper {
-    /** {@inheritDoc} */
-    @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) 
{
-        GridCacheEvictionPolicy evictPlc = cfg.getEvictionPolicy();
-
-        if (evictPlc instanceof GridCacheGgfsPerBlockLruEvictionPolicy && 
cfg.getEvictionFilter() == null)
-            cfg.setEvictionFilter(new GridCacheGgfsEvictionFilter());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void validateCacheConfiguration(CacheConfiguration cfg) 
throws IgniteCheckedException {
-        GridCacheEvictionPolicy evictPlc =  cfg.getEvictionPolicy();
-
-        if (evictPlc != null && evictPlc instanceof 
GridCacheGgfsPerBlockLruEvictionPolicy) {
-            GridCacheEvictionFilter evictFilter = cfg.getEvictionFilter();
-
-            if (evictFilter != null && !(evictFilter instanceof 
GridCacheGgfsEvictionFilter))
-                throw new IgniteCheckedException("Eviction filter cannot be 
set explicitly when using " +
-                    "GridCacheGgfsPerBlockLruEvictionPolicy:" + cfg.getName());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isGgfsBlockKey(Object key) {
-        return key instanceof GridGgfsBlockKey;
-    }
-}

Reply via email to