http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDeleteWorker.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDeleteWorker.java deleted file mode 100644 index 1a06365..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDeleteWorker.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.future.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.internal.GridTopic.*; -import static org.gridgain.grid.kernal.processors.ggfs.GridGgfsFileInfo.*; - -/** - * GGFS worker for removal from the trash directory. - */ -public class GridGgfsDeleteWorker extends GridGgfsThread { - /** Awake frequency, */ - private static final long FREQUENCY = 1000; - - /** How many files/folders to delete at once (i.e in a single transaction). */ - private static final int MAX_DELETE_BATCH = 100; - - /** GGFS context. */ - private final GridGgfsContext ggfsCtx; - - /** Metadata manager. */ - private final GridGgfsMetaManager meta; - - /** Data manager. */ - private final GridGgfsDataManager data; - - /** Event manager. */ - private final GridEventStorageManager evts; - - /** Logger. */ - private final IgniteLogger log; - - /** Lock. */ - private final Lock lock = new ReentrantLock(); - - /** Condition. */ - private final Condition cond = lock.newCondition(); - - /** Force worker to perform actual delete. */ - private boolean force; - - /** Cancellation flag. */ - private volatile boolean cancelled; - - /** Message topic. */ - private Object topic; - - /** - * Constructor. - * - * @param ggfsCtx GGFS context. - */ - GridGgfsDeleteWorker(GridGgfsContext ggfsCtx) { - super("ggfs-delete-worker%" + ggfsCtx.ggfs().name() + "%" + ggfsCtx.kernalContext().localNodeId() + "%"); - - this.ggfsCtx = ggfsCtx; - - meta = ggfsCtx.meta(); - data = ggfsCtx.data(); - - evts = ggfsCtx.kernalContext().event(); - - String ggfsName = ggfsCtx.ggfs().name(); - - topic = F.isEmpty(ggfsName) ? TOPIC_GGFS : TOPIC_GGFS.topic(ggfsName); - - assert meta != null; - assert data != null; - - log = ggfsCtx.kernalContext().log(GridGgfsDeleteWorker.class); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Delete worker started."); - - while (!cancelled) { - lock.lock(); - - try { - if (!cancelled && !force) - cond.await(FREQUENCY, TimeUnit.MILLISECONDS); - - force = false; // Reset force flag. - } - finally { - lock.unlock(); - } - - if (!cancelled) - delete(); - } - } - - /** - * Notify the worker that new entry to delete appeared. - */ - void signal() { - lock.lock(); - - try { - force = true; - - cond.signalAll(); - } - finally { - lock.unlock(); - } - } - - void cancel() { - cancelled = true; - - interrupt(); - } - - /** - * Perform cleanup of the trash directory. - */ - private void delete() { - GridGgfsFileInfo info = null; - - try { - info = meta.info(TRASH_ID); - } - catch (IgniteCheckedException e) { - U.error(log, "Cannot obtain trash directory info.", e); - } - - if (info != null) { - for (Map.Entry<String, GridGgfsListingEntry> entry : info.listing().entrySet()) { - IgniteUuid fileId = entry.getValue().fileId(); - - if (log.isDebugEnabled()) - log.debug("Deleting GGFS trash entry [name=" + entry.getKey() + ", fileId=" + fileId + ']'); - - try { - if (!cancelled) { - if (delete(entry.getKey(), fileId)) { - if (log.isDebugEnabled()) - log.debug("Sending delete confirmation message [name=" + entry.getKey() + - ", fileId=" + fileId + ']'); - - sendDeleteMessage(new GridGgfsDeleteMessage(fileId)); - } - } - else - break; - } - catch (IgniteInterruptedException ignored) { - // Ignore this exception while stopping. - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e); - - sendDeleteMessage(new GridGgfsDeleteMessage(fileId, e)); - } - } - } - } - - /** - * Remove particular entry from the TRASH directory. - * - * @param name Entry name. - * @param id Entry ID. - * @return {@code True} in case the entry really was deleted form the file system by this call. - * @throws IgniteCheckedException If failed. - */ - private boolean delete(String name, IgniteUuid id) throws IgniteCheckedException { - assert name != null; - assert id != null; - - while (true) { - GridGgfsFileInfo info = meta.info(id); - - if (info != null) { - if (info.isDirectory()) { - deleteDirectory(TRASH_ID, id); - - if (meta.delete(TRASH_ID, name, id)) - return true; - } - else { - assert info.isFile(); - - // Delete file content first. - // In case this node crashes, other node will re-delete the file. - data.delete(info).get(); - - boolean ret = meta.delete(TRASH_ID, name, id); - - if (evts.isRecordable(EVT_GGFS_FILE_PURGED)) { - if (info.path() != null) - evts.record(new IgniteFsEvent(info.path(), - ggfsCtx.kernalContext().discovery().localNode(), EVT_GGFS_FILE_PURGED)); - else - LT.warn(log, null, "Removing file without path info: " + info); - } - - return ret; - } - } - else - return false; // Entry was deleted concurrently. - } - } - - /** - * Remove particular entry from the trash directory or subdirectory. - * - * @param parentId Parent ID. - * @param id Entry id. - * @throws IgniteCheckedException If delete failed for some reason. - */ - private void deleteDirectory(IgniteUuid parentId, IgniteUuid id) throws IgniteCheckedException { - assert parentId != null; - assert id != null; - - while (true) { - GridGgfsFileInfo info = meta.info(id); - - if (info != null) { - assert info.isDirectory(); - - Map<String, GridGgfsListingEntry> listing = info.listing(); - - if (listing.isEmpty()) - return; // Directory is empty. - - Map<String, GridGgfsListingEntry> delListing; - - if (listing.size() <= MAX_DELETE_BATCH) - delListing = listing; - else { - delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f); - - int i = 0; - - for (Map.Entry<String, GridGgfsListingEntry> entry : listing.entrySet()) { - delListing.put(entry.getKey(), entry.getValue()); - - if (++i == MAX_DELETE_BATCH) - break; - } - } - - GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>(ggfsCtx.kernalContext()); - - // Delegate to child folders. - for (GridGgfsListingEntry entry : delListing.values()) { - if (!cancelled) { - if (entry.isDirectory()) - deleteDirectory(id, entry.fileId()); - else { - GridGgfsFileInfo fileInfo = meta.info(entry.fileId()); - - if (fileInfo != null) { - assert fileInfo.isFile(); - - fut.add(data.delete(fileInfo)); - } - } - } - else - return; - } - - fut.markInitialized(); - - // Wait for data cache to delete values before clearing meta cache. - try { - fut.get(); - } - catch (IgniteFutureCancelledException ignore) { - // This future can be cancelled only due to GGFS shutdown. - cancelled = true; - - return; - } - - // Actual delete of folder content. - Collection<IgniteUuid> delIds = meta.delete(id, delListing); - - if (delListing == listing && delListing.size() == delIds.size()) - break; // All entries were deleted. - } - else - break; // Entry was deleted concurrently. - } - } - - /** - * Send delete message to all meta cache nodes in the grid. - * - * @param msg Message to send. - */ - private void sendDeleteMessage(GridGgfsDeleteMessage msg) { - assert msg != null; - - Collection<ClusterNode> nodes = meta.metaCacheNodes(); - - boolean first = true; - - for (ClusterNode node : nodes) { - GridGgfsCommunicationMessage msg0 = first ? msg : (GridGgfsCommunicationMessage)msg.clone(); - - first = false; - - try { - ggfsCtx.send(node, topic, msg0, GridIoPolicy.SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send GGFS delete message to node [nodeId=" + node.id() + - ", msg=" + msg + ", err=" + e.getMessage() + ']'); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDirectoryNotEmptyException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDirectoryNotEmptyException.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDirectoryNotEmptyException.java deleted file mode 100644 index 3c5a932..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDirectoryNotEmptyException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.fs.*; - -/** - * Exception indicating that directory can not be deleted because it is not empty. - */ -public class GridGgfsDirectoryNotEmptyException extends IgniteFsException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param msg Exception message. - */ - public GridGgfsDirectoryNotEmptyException(String msg) { - super(msg); - } - - /** - * Creates an instance of GGFS exception caused by nested exception. - * - * @param cause Exception cause. - */ - public GridGgfsDirectoryNotEmptyException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsEx.java deleted file mode 100644 index eec745a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsEx.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.*; -import org.jetbrains.annotations.*; - -import java.net.*; - -/** - * Internal API extension for {@link org.apache.ignite.IgniteFs}. - */ -public interface GridGgfsEx extends IgniteFs { - /** - * Stops GGFS cleaning all used resources. - */ - public void stop(); - - /** - * @return GGFS context. - */ - public GridGgfsContext context(); - - /** - * Get handshake message. - * - * @return Handshake message. - */ - public GridGgfsPaths proxyPaths(); - - /** {@inheritDoc} */ - @Override GridGgfsInputStreamAdapter open(IgniteFsPath path, int bufSize, int seqReadsBeforePrefetch) - throws IgniteCheckedException; - - /** {@inheritDoc} */ - @Override GridGgfsInputStreamAdapter open(IgniteFsPath path) throws IgniteCheckedException; - - /** {@inheritDoc} */ - @Override GridGgfsInputStreamAdapter open(IgniteFsPath path, int bufSize) throws IgniteCheckedException; - - /** - * Gets global space counters. - * - * @return Tuple in which first component is used space on all nodes, - * second is available space on all nodes. - * @throws IgniteCheckedException If task execution failed. - */ - public GridGgfsStatus globalSpace() throws IgniteCheckedException; - - /** - * Enables, disables or clears sampling flag. - * - * @param val {@code True} to turn on sampling, {@code false} to turn it off, {@code null} to clear sampling state. - * @throws IgniteCheckedException If failed. - */ - public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException; - - /** - * Get sampling state. - * - * @return {@code True} in case sampling is enabled, {@code false} otherwise, or {@code null} in case sampling - * flag is not set. - */ - @Nullable public Boolean globalSampling(); - - /** - * Get local metrics. - * - * @return Local metrics. - */ - public GridGgfsLocalMetrics localMetrics(); - - /** - * Gets group block size, i.e. block size multiplied by group size in affinity mapper. - * - * @return Group block size. - */ - public long groupBlockSize(); - - /** - * Asynchronously await for all entries existing in trash to be removed. - * - * @return Future which will be completed when all entries existed in trash by the time of invocation are removed. - * @throws IgniteCheckedException If failed. - */ - public IgniteFuture<?> awaitDeletesAsync() throws IgniteCheckedException; - - /** - * Gets client file system log directory. - * - * @return Client file system log directory or {@code null} in case no client connections have been created yet. - */ - @Nullable public String clientLogDirectory(); - - /** - * Sets client file system log directory. - * - * @param logDir Client file system log directory. - */ - public void clientLogDirectory(String logDir); - - /** - * Whether this path is excluded from evictions. - * - * @param path Path. - * @param primary Whether the mode is PRIMARY. - * @return {@code True} if path is excluded from evictions. - */ - public boolean evictExclude(IgniteFsPath path, boolean primary); - - /** - * Get next affinity key. - * - * @return Next affinity key. - */ - public IgniteUuid nextAffinityKey(); - - /** - * Check whether the given path is proxy path. - * - * @param path Path. - * @return {@code True} if proxy. - */ - public boolean isProxy(URI path); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileAffinityRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileAffinityRange.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileAffinityRange.java deleted file mode 100644 index 694e70a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileAffinityRange.java +++ /dev/null @@ -1,396 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Affinity range. - */ -public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapter implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Initial range status, right after creation. */ - public static final int RANGE_STATUS_INITIAL = 0; - - /** Moving range state. Fragmentizer started blocks copy. */ - public static final int RANGE_STATUS_MOVING = 1; - - /** Fragmentizer finished block copy for this range. */ - public static final int RANGE_STATUS_MOVED = 2; - - /** Range affinity key. */ - private IgniteUuid affKey; - - /** {@code True} if currently being moved by fragmentizer. */ - @SuppressWarnings("RedundantFieldInitialization") - private int status = RANGE_STATUS_INITIAL; - - /** Range start offset (divisible by block size). */ - private long startOff; - - /** Range end offset (endOff + 1 divisible by block size). */ - private long endOff; - - /** Transient flag indicating no further writes should be made to this range. */ - private boolean done; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridGgfsFileAffinityRange() { - // No-op. - } - - /** - * @param startOff Start offset. - * @param endOff End offset. - * @param affKey Affinity key. - */ - GridGgfsFileAffinityRange(long startOff, long endOff, IgniteUuid affKey) { - this.startOff = startOff; - this.endOff = endOff; - this.affKey = affKey; - } - - /** - * Creates new range with updated status. - * - * @param other Initial range. - * @param status Updated status. - */ - GridGgfsFileAffinityRange(GridGgfsFileAffinityRange other, int status) { - startOff = other.startOff; - endOff = other.endOff; - affKey = other.affKey; - - this.status = status; - } - - /** - * @return Affinity key for this range. - */ - public IgniteUuid affinityKey() { - return affKey; - } - - /** - * @return Range start offset. - */ - public long startOffset() { - return startOff; - } - - /** - * @return Range end offset. - */ - public long endOffset() { - return endOff; - } - - /** - * @param blockStartOff Block start offset to check. - * @return {@code True} if block with given start offset belongs to this range. - */ - public boolean belongs(long blockStartOff) { - return blockStartOff >= startOff && blockStartOff < endOff; - } - - /** - * @param blockStartOff Block start offset to check. - * @return {@code True} if block with given start offset is located before this range. - */ - public boolean less(long blockStartOff) { - return blockStartOff < startOff; - } - - /** - * @param blockStartOff Block start offset to check. - * @return {@code True} if block with given start offset is located after this range. - */ - public boolean greater(long blockStartOff) { - return blockStartOff > endOff; - } - - /** - * @return If range is empty, i.e. has zero length. - */ - public boolean empty() { - return startOff == endOff; - } - - /** - * @return Range status. - */ - public int status() { - return status; - } - - /** - * Expands this range by given block. - * - * @param blockStartOff Offset of block start. - * @param expansionSize Block size. - */ - public void expand(long blockStartOff, int expansionSize) { - // If we are expanding empty range. - if (endOff == startOff) { - assert endOff == blockStartOff : "Failed to expand range [endOff=" + endOff + - ", blockStartOff=" + blockStartOff + ", expansionSize=" + expansionSize + ']'; - - endOff += expansionSize - 1; - } - else { - assert endOff == blockStartOff - 1; - - endOff += expansionSize; - } - } - - /** - * Splits range into collection if smaller ranges with length equal to {@code maxSize}. - * - * @param maxSize Split part maximum size. - * @return Collection of range parts. - */ - public Collection<GridGgfsFileAffinityRange> split(long maxSize) { - long len = endOff - startOff + 1; - - if (len > maxSize) { - int size = (int)(len / maxSize + 1); - - Collection<GridGgfsFileAffinityRange> res = new ArrayList<>(size); - - long pos = startOff; - - while (pos < endOff + 1) { - long end = Math.min(pos + maxSize - 1, endOff); - - GridGgfsFileAffinityRange part = new GridGgfsFileAffinityRange(pos, end, affKey); - - part.status = status; - - res.add(part); - - pos = end + 1; - } - - return res; - } - else - return Collections.singletonList(this); - } - - /** - * Tries to concatenate this range with a given one. If ranges are not adjacent, will return {@code null}. - * - * @param range Range to concatenate with. - * @return Concatenation result or {@code null} if ranges are not adjacent. - */ - @Nullable public GridGgfsFileAffinityRange concat(GridGgfsFileAffinityRange range) { - if (endOff + 1 != range.startOff || !F.eq(affKey, range.affKey) || status != RANGE_STATUS_INITIAL) - return null; - - return new GridGgfsFileAffinityRange(startOff, range.endOff, affKey); - } - - /** - * Marks this range as done. - */ - public void markDone() { - done = true; - } - - /** - * @return Done flag. - */ - public boolean done() { - return done; - } - - /** - * Checks if range regions are equal. - * - * @param other Other range to check against. - * @return {@code True} if range regions are equal. - */ - public boolean regionEqual(GridGgfsFileAffinityRange other) { - return startOff == other.startOff && endOff == other.endOff; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, affKey); - - out.writeInt(status); - - out.writeLong(startOff); - out.writeLong(endOff); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - affKey = U.readGridUuid(in); - - status = in.readInt(); - - startOff = in.readLong(); - endOff = in.readLong(); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridGgfsFileAffinityRange _clone = new GridGgfsFileAffinityRange(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - GridGgfsFileAffinityRange _clone = (GridGgfsFileAffinityRange)_msg; - - _clone.affKey = affKey; - _clone.status = status; - _clone.startOff = startOff; - _clone.endOff = endOff; - _clone.done = done; - } - - /** {@inheritDoc} */ - @SuppressWarnings("fallthrough") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 0: - if (!commState.putGridUuid(affKey)) - return false; - - commState.idx++; - - case 1: - if (!commState.putBoolean(done)) - return false; - - commState.idx++; - - case 2: - if (!commState.putLong(endOff)) - return false; - - commState.idx++; - - case 3: - if (!commState.putLong(startOff)) - return false; - - commState.idx++; - - case 4: - if (!commState.putInt(status)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("fallthrough") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - switch (commState.idx) { - case 0: - IgniteUuid affKey0 = commState.getGridUuid(); - - if (affKey0 == GRID_UUID_NOT_READ) - return false; - - affKey = affKey0; - - commState.idx++; - - case 1: - if (buf.remaining() < 1) - return false; - - done = commState.getBoolean(); - - commState.idx++; - - case 2: - if (buf.remaining() < 8) - return false; - - endOff = commState.getLong(); - - commState.idx++; - - case 3: - if (buf.remaining() < 8) - return false; - - startOff = commState.getLong(); - - commState.idx++; - - case 4: - if (buf.remaining() < 4) - return false; - - status = commState.getInt(); - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 69; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsFileAffinityRange.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileInfo.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileInfo.java deleted file mode 100644 index bc95e83..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileInfo.java +++ /dev/null @@ -1,568 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Unmodifiable file information. - */ -public final class GridGgfsFileInfo implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** ID for the root directory. */ - public static final IgniteUuid ROOT_ID = new IgniteUuid(new UUID(0, 0), 0); - - /** ID of the trash directory. */ - public static final IgniteUuid TRASH_ID = new IgniteUuid(new UUID(0, 1), 0); - - /** Info ID. */ - private IgniteUuid id; - - /** File length in bytes. */ - private long len; - - /** File block size, {@code zero} for directories. */ - private int blockSize; - - /** File properties. */ - private Map<String, String> props; - - /** File lock ID. */ - private IgniteUuid lockId; - - /** Affinity key used for single-node file collocation. */ - private IgniteUuid affKey; - - /** File affinity map. */ - private GridGgfsFileMap fileMap; - - /** Last access time. Modified on-demand. */ - private long accessTime; - - /** Last modification time. */ - private long modificationTime; - - /** Directory listing. */ - @GridToStringInclude - private Map<String, GridGgfsListingEntry> listing; - - /** Whether data blocks of this entry should never be excluded. */ - private boolean evictExclude; - - /** - * Original file path. This is a helper field used only in some - * operations like delete. - */ - private IgniteFsPath path; - - /** - * {@link Externalizable} support. - */ - public GridGgfsFileInfo() { - this(ROOT_ID); - } - - /** - * Constructs directory file info with the given ID. - * - * @param id ID. - */ - GridGgfsFileInfo(IgniteUuid id) { - this(true, id, 0, 0, null, null, null, null, false, System.currentTimeMillis(), false); - } - - /** - * Constructs directory or file info with {@link org.apache.ignite.fs.IgniteFsConfiguration#DFLT_BLOCK_SIZE default} block size. - * - * @param isDir Constructs directory info if {@code true} or file info if {@code false}. - * @param props Meta properties to set. - */ - public GridGgfsFileInfo(boolean isDir, @Nullable Map<String, String> props) { - this(isDir, null, isDir ? 0 : IgniteFsConfiguration.DFLT_BLOCK_SIZE, 0, null, null, props, null, false, - System.currentTimeMillis(), false); - } - - /** - * Consturcts directory with random ID and provided listing. - * - * @param listing Listing. - */ - GridGgfsFileInfo(Map<String, GridGgfsListingEntry> listing) { - this(true, null, 0, 0, null, listing, null, null, false, System.currentTimeMillis(), false); - } - - /** - * Constructs file info. - * - * @param blockSize Block size. - * @param affKey Affinity key. - * @param evictExclude Eviction exclude flag. - * @param props File properties. - */ - GridGgfsFileInfo(int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude, - @Nullable Map<String, String> props) { - this(false, null, blockSize, 0, affKey, null, props, null, true, System.currentTimeMillis(), evictExclude); - } - - /** - * Constructs file info. - * - * @param blockSize Block size. - * @param len Length. - * @param affKey Affinity key. - * @param lockId Lock ID. - * @param props Properties. - * @param evictExclude Evict exclude flag. - */ - public GridGgfsFileInfo(int blockSize, long len, @Nullable IgniteUuid affKey, @Nullable IgniteUuid lockId, - boolean evictExclude, @Nullable Map<String, String> props) { - this(false, null, blockSize, len, affKey, null, props, lockId, true, System.currentTimeMillis(), evictExclude); - } - - /** - * Constructs file information. - * - * @param info File information to copy data from. - * @param len Size of a file. - */ - GridGgfsFileInfo(GridGgfsFileInfo info, long len) { - this(info.isDirectory(), info.id, info.blockSize, len, info.affKey, info.listing, info.props, info.fileMap(), - info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); - } - - /** - * Constructs file info. - * - * @param info File info. - * @param accessTime Last access time. - * @param modificationTime Last modification time. - */ - GridGgfsFileInfo(GridGgfsFileInfo info, long accessTime, long modificationTime) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, - info.fileMap(), info.lockId, false, accessTime, modificationTime, info.evictExclude()); - } - - /** - * Constructs file information. - * - * @param info File information to copy data from. - * @param props File properties to set. - */ - GridGgfsFileInfo(GridGgfsFileInfo info, @Nullable Map<String, String> props) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, props, - info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); - } - - /** - * Constructs file info. - * - * @param blockSize Block size, - * @param len Size of a file. - * @param props File properties to set. - * @param evictExclude Evict exclude flag. - */ - GridGgfsFileInfo(int blockSize, long len, boolean evictExclude, @Nullable Map<String, String> props) { - this(false, null, blockSize, len, null, null, props, null, true, System.currentTimeMillis(), evictExclude); - } - - /** - * Constructs file information. - * - * @param info File information to copy data from. - * @param lockId Lock ID. - * @param modificationTime Last modification time. - */ - GridGgfsFileInfo(GridGgfsFileInfo info, @Nullable IgniteUuid lockId, long modificationTime) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, - info.fileMap(), lockId, true, info.accessTime, modificationTime, info.evictExclude()); - } - - /** - * Constructs file info. - * - * @param listing New directory listing. - * @param old Old file info. - */ - GridGgfsFileInfo(Map<String, GridGgfsListingEntry> listing, GridGgfsFileInfo old) { - this(old.isDirectory(), old.id, old.blockSize, old.len, old.affKey, listing, old.props, old.fileMap(), - old.lockId, false, old.accessTime, old.modificationTime, old.evictExclude()); - } - - /** - * Constructs file info. - * - * @param isDir Constructs directory info if {@code true} or file info if {@code false}. - * @param id ID or {@code null} to generate it automatically. - * @param blockSize Block size. - * @param len Size of a file. - * @param affKey Affinity key for data blocks. - * @param listing Directory listing. - * @param props File properties. - * @param lockId Lock ID. - * @param cpProps Flag to copy properties map. - * @param modificationTime Last modification time. - * @param evictExclude Evict exclude flag. - */ - private GridGgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey, - @Nullable Map<String, GridGgfsListingEntry> listing, @Nullable Map<String, String> props, - @Nullable IgniteUuid lockId, boolean cpProps, long modificationTime, boolean evictExclude) { - this(isDir, id, blockSize, len, affKey, listing, props, null, lockId, cpProps, modificationTime, - modificationTime, evictExclude); - } - - /** - * Constructs file info. - * - * @param isDir Constructs directory info if {@code true} or file info if {@code false}. - * @param id ID or {@code null} to generate it automatically. - * @param blockSize Block size. - * @param len Size of a file. - * @param affKey Affinity key for data blocks. - * @param listing Directory listing. - * @param props File properties. - * @param fileMap File map. - * @param lockId Lock ID. - * @param cpProps Flag to copy properties map. - * @param accessTime Last access time. - * @param modificationTime Last modification time. - * @param evictExclude Evict exclude flag. - */ - private GridGgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey, - @Nullable Map<String, GridGgfsListingEntry> listing, @Nullable Map<String, String> props, - @Nullable GridGgfsFileMap fileMap, @Nullable IgniteUuid lockId, boolean cpProps, long accessTime, - long modificationTime, boolean evictExclude) { - assert F.isEmpty(listing) || isDir; - - if (isDir) { - assert len == 0 : "Directory length should be zero: " + len; - assert blockSize == 0 : "Directory block size should be zero: " + blockSize; - } - else { - assert len >= 0 : "File length cannot be negative: " + len; - assert blockSize > 0 : "File block size should be positive: " + blockSize; - } - - this.id = id == null ? IgniteUuid.randomUuid() : id; - this.len = isDir ? 0 : len; - this.blockSize = isDir ? 0 : blockSize; - this.affKey = affKey; - this.listing = listing; - - if (fileMap == null && !isDir) - fileMap = new GridGgfsFileMap(); - - this.fileMap = fileMap; - this.accessTime = accessTime; - this.modificationTime = modificationTime; - - // Always make a copy of passed properties collection to escape concurrent modifications. - this.props = props == null || props.isEmpty() ? null : - cpProps ? new GridLeanMap<>(props) : props; - - if (listing == null && isDir) - this.listing = Collections.emptyMap(); - - this.lockId = lockId; - this.evictExclude = evictExclude; - } - - /** - * A copy constructor, which takes all data from the specified - * object field-by-field. - * - * @param info An object to copy data info. - */ - public GridGgfsFileInfo(GridGgfsFileInfo info) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, - info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); - } - - /** - * Creates a builder for the new instance of file info. - * - * @return A builder to construct a new unmodifiable instance - * of this class. - */ - public static Builder builder() { - return new Builder(new GridGgfsFileInfo()); - } - - /** - * Creates a builder for the new instance of file info, - * based on the specified origin. - * - * @param origin An origin for new instance, from which - * the data will be copied. - * @return A builder to construct a new unmodifiable instance - * of this class. - */ - public static Builder builder(GridGgfsFileInfo origin) { - return new Builder(new GridGgfsFileInfo(origin)); - } - - /** - * Gets this item ID. - * - * @return This item ID. - */ - public IgniteUuid id() { - return id; - } - - /** - * @return {@code True} if this is a file. - */ - public boolean isFile() { - return blockSize > 0; - } - - /** - * @return {@code True} if this is a directory. - */ - public boolean isDirectory() { - return blockSize == 0; - } - - /** - * Get file size. - * - * @return File size. - */ - public long length() { - assert isFile(); - - return len; - } - - /** - * Get single data block size to store this file. - * - * @return Single data block size to store this file. - */ - public int blockSize() { - assert isFile(); - - return blockSize; - } - - /** - * @return Number of data blocks to store this file. - */ - public long blocksCount() { - assert isFile(); - - return (len + blockSize() - 1) / blockSize(); - } - - /** - * @return Last access time. - */ - public long accessTime() { - return accessTime; - } - - /** - * @return Last modification time. - */ - public long modificationTime() { - return modificationTime; - } - - /** - * @return Directory listing. - */ - public Map<String, GridGgfsListingEntry> listing() { - // Always wrap into unmodifiable map to be able to avoid illegal modifications in order pieces of the code. - if (isFile()) - return Collections.unmodifiableMap(Collections.<String, GridGgfsListingEntry>emptyMap()); - - assert listing != null; - - return Collections.unmodifiableMap(listing); - } - - /** - * @return Affinity key used for single-node file collocation. If {@code null}, usual - * mapper procedure is used for block affinity detection. - */ - @Nullable public IgniteUuid affinityKey() { - return affKey; - } - - /** - * @param affKey Affinity key used for single-node file collocation. - */ - public void affinityKey(IgniteUuid affKey) { - this.affKey = affKey; - } - - /** - * @return File affinity map. - */ - public GridGgfsFileMap fileMap() { - return fileMap; - } - - /** - * @param fileMap File affinity map. - */ - public void fileMap(GridGgfsFileMap fileMap) { - this.fileMap = fileMap; - } - - /** - * Get properties of the file. - * - * @return Properties of the file. - */ - public Map<String, String> properties() { - return props == null || props.isEmpty() ? Collections.<String, String>emptyMap() : - Collections.unmodifiableMap(props); - } - - /** - * Get lock ID. - * - * @return Lock ID if file is locked or {@code null} if file is free of locks. - */ - @Nullable public IgniteUuid lockId() { - return lockId; - } - - /** - * Get evict exclude flag. - * - * @return Evict exclude flag. - */ - public boolean evictExclude() { - return evictExclude; - } - - /** - * @return Original file path. This is a helper field used only in some operations like delete. - */ - public IgniteFsPath path() { - return path; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, id); - out.writeInt(blockSize); - out.writeLong(len); - U.writeStringMap(out, props); - U.writeGridUuid(out, lockId); - U.writeGridUuid(out, affKey); - out.writeObject(listing); - out.writeObject(fileMap); - out.writeLong(accessTime); - out.writeLong(modificationTime); - out.writeBoolean(evictExclude); - out.writeObject(path); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = U.readGridUuid(in); - blockSize = in.readInt(); - len = in.readLong(); - props = U.readStringMap(in); - lockId = U.readGridUuid(in); - affKey = U.readGridUuid(in); - listing = (Map<String, GridGgfsListingEntry>)in.readObject(); - fileMap = (GridGgfsFileMap)in.readObject(); - accessTime = in.readLong(); - modificationTime = in.readLong(); - evictExclude = in.readBoolean(); - path = (IgniteFsPath)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id.hashCode() ^ blockSize ^ (int)(len ^ (len >>> 32)) ^ (props == null ? 0 : props.hashCode()) ^ - (lockId == null ? 0 : lockId.hashCode()); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (obj == this) - return true; - - if (obj == null || getClass() != obj.getClass()) - return false; - - GridGgfsFileInfo that = (GridGgfsFileInfo)obj; - - return id.equals(that.id) && blockSize == that.blockSize && len == that.len && F.eq(affKey, that.affKey) && - F.eq(props, that.props) && F.eq(lockId, that.lockId); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsFileInfo.class, this); - } - - /** - * Builder for {@link GridGgfsFileInfo}. - */ - @SuppressWarnings("PublicInnerClass") - public static class Builder { - /** Instance to build. */ - private final GridGgfsFileInfo info; - - /** - * Private constructor. - * - * @param info Instance to build. - */ - private Builder(GridGgfsFileInfo info) { - this.info = info; - } - - /** - * @param path A new path value. - * @return This builder instance (for chaining). - */ - public Builder path(IgniteFsPath path) { - info.path = path; - - return this; - } - - /** - * Finishes instance construction and returns a resulting - * unmodifiable instance. - * - * @return A constructed instance. - */ - public GridGgfsFileInfo build() { - return info; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileMap.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileMap.java deleted file mode 100644 index 47e054c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileMap.java +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.gridgain.grid.kernal.processors.ggfs.GridGgfsFileAffinityRange.*; - -/** - * Auxiliary class that is responsible for managing file affinity keys allocation by ranges. - */ -public class GridGgfsFileMap implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - @GridToStringInclude - /** Sorted list of ranges in ascending order. */ - private List<GridGgfsFileAffinityRange> ranges; - - /** - * Empty constructor. - */ - public GridGgfsFileMap() { - // No-op. - } - - /** - * Constructs same file map as passed in. - * - * @param old Old map. - */ - public GridGgfsFileMap(@Nullable GridGgfsFileMap old) { - if (old != null && old.ranges != null) { - ranges = new ArrayList<>(old.ranges.size()); - - ranges.addAll(old.ranges); - } - } - - /** - * Gets affinity key from file map based on block start offset. - * - * @param blockOff Block start offset (divisible by block size). - * @param includeMoved If {@code true} then will return affinity key for ranges marked as moved. - * Otherwise will return null for such ranges. - * @return Affinity key. - */ - public IgniteUuid affinityKey(long blockOff, boolean includeMoved) { - if (ranges == null) - return null; - - assert !ranges.isEmpty(); - - // Range binary search. - int leftIdx = 0, rightIdx = ranges.size() - 1; - - GridGgfsFileAffinityRange leftRange = ranges.get(leftIdx); - GridGgfsFileAffinityRange rightRange = ranges.get(rightIdx); - - // If block offset is less than start of first range, we don't have affinity key. - if (leftRange.less(blockOff)) - return null; - - if (leftRange.belongs(blockOff)) - return leftRange.status() != RANGE_STATUS_MOVED ? leftRange.affinityKey() : - includeMoved ? leftRange.affinityKey() : null; - - if (rightRange.greater(blockOff)) - return null; - - if (rightRange.belongs(blockOff)) - return rightRange.status() != RANGE_STATUS_MOVED ? rightRange.affinityKey() : - includeMoved ? leftRange.affinityKey() : null; - - while (rightIdx - leftIdx > 1) { - int midIdx = (leftIdx + rightIdx) / 2; - - GridGgfsFileAffinityRange midRange = ranges.get(midIdx); - - if (midRange.belongs(blockOff)) - return midRange.status() != RANGE_STATUS_MOVED ? midRange.affinityKey() : - includeMoved ? leftRange.affinityKey() : null; - - // If offset is less then block start, update right index. - if (midRange.less(blockOff)) - rightIdx = midIdx; - else { - assert midRange.greater(blockOff); - - leftIdx = midIdx; - } - } - - // Range was not found. - return null; - } - - /** - * Updates range status in file map. Will split range into two ranges if given range is a sub-range starting - * from the same offset. - * - * @param range Range to update status. - * @param status New range status. - * @throws IgniteCheckedException If range was not found. - */ - public void updateRangeStatus(GridGgfsFileAffinityRange range, int status) throws IgniteCheckedException { - if (ranges == null) - throw new GridGgfsInvalidRangeException("Failed to update range status (file map is empty) " + - "[range=" + range + ", ranges=" + ranges + ']'); - - assert !ranges.isEmpty(); - - // Check last. - int lastIdx = ranges.size() - 1; - - GridGgfsFileAffinityRange last = ranges.get(lastIdx); - - if (last.startOffset() == range.startOffset()) { - updateRangeStatus0(lastIdx, last, range, status); - - return; - } - - // Check first. - int firstIdx = 0; - - GridGgfsFileAffinityRange first = ranges.get(firstIdx); - - if (first.startOffset() == range.startOffset()) { - updateRangeStatus0(firstIdx, first, range, status); - - return; - } - - // Binary search. - while (lastIdx - firstIdx > 1) { - int midIdx = (firstIdx + lastIdx) / 2; - - GridGgfsFileAffinityRange midRange = ranges.get(midIdx); - - if (midRange.startOffset() == range.startOffset()) { - updateRangeStatus0(midIdx, midRange, range, status); - - return; - } - - // If range we are looking for is less - if (midRange.less(range.startOffset())) - lastIdx = midIdx; - else { - assert midRange.greater(range.startOffset()); - - firstIdx = midIdx; - } - } - - throw new GridGgfsInvalidRangeException("Failed to update map for range (corresponding map range " + - "was not found) [range=" + range + ", status=" + status + ", ranges=" + ranges + ']'); - } - - /** - * Deletes range from map. - * - * @param range Range to delete. - */ - public void deleteRange(GridGgfsFileAffinityRange range) throws IgniteCheckedException { - if (ranges == null) - throw new GridGgfsInvalidRangeException("Failed to remove range (file map is empty) " + - "[range=" + range + ", ranges=" + ranges + ']'); - - assert !ranges.isEmpty(); - - try { - // Check last. - int lastIdx = ranges.size() - 1; - - GridGgfsFileAffinityRange last = ranges.get(lastIdx); - - if (last.regionEqual(range)) { - assert last.status() == RANGE_STATUS_MOVED; - - ranges.remove(last); - - return; - } - - // Check first. - int firstIdx = 0; - - GridGgfsFileAffinityRange first = ranges.get(firstIdx); - - if (first.regionEqual(range)) { - assert first.status() == RANGE_STATUS_MOVED; - - ranges.remove(first); - - return; - } - - // Binary search. - while (lastIdx - firstIdx > 1) { - int midIdx = (firstIdx + lastIdx) / 2; - - GridGgfsFileAffinityRange midRange = ranges.get(midIdx); - - if (midRange.regionEqual(range)) { - assert midRange.status() == RANGE_STATUS_MOVED; - - ranges.remove(midIdx); - - return; - } - - // If range we are looking for is less - if (midRange.less(range.startOffset())) - lastIdx = midIdx; - else { - assert midRange.greater(range.startOffset()); - - firstIdx = midIdx; - } - } - } - finally { - if (ranges.isEmpty()) - ranges = null; - } - - throw new GridGgfsInvalidRangeException("Failed to remove range from file map (corresponding map range " + - "was not found) [range=" + range + ", ranges=" + ranges + ']'); - } - - /** - * Updates range status at given position (will split range into two if necessary). - * - * @param origIdx Original range index. - * @param orig Original range at index. - * @param update Range being updated. - * @param status New status for range. - */ - private void updateRangeStatus0(int origIdx, GridGgfsFileAffinityRange orig, GridGgfsFileAffinityRange update, - int status) { - assert F.eq(orig.affinityKey(), update.affinityKey()); - assert ranges.get(origIdx) == orig; - - if (orig.regionEqual(update)) - ranges.set(origIdx, new GridGgfsFileAffinityRange(update, status)); - else { - // If range was expanded, new one should be larger. - assert orig.endOffset() > update.endOffset(); - - ranges.set(origIdx, new GridGgfsFileAffinityRange(update, status)); - ranges.add(origIdx + 1, new GridGgfsFileAffinityRange(update.endOffset() + 1, orig.endOffset(), - orig.affinityKey())); - } - } - - /** - * Gets full list of ranges present in this map. - * - * @return Unmodifiable list of ranges. - */ - public List<GridGgfsFileAffinityRange> ranges() { - if (ranges == null) - return Collections.emptyList(); - - return Collections.unmodifiableList(ranges); - } - - /** - * Adds range to the list of already existing ranges. Added range must be located after - * the last range in this map. If added range is adjacent to the last range in the map, - * added range will be concatenated to the last one. - * - * @param range Range to add. - */ - public void addRange(GridGgfsFileAffinityRange range) { - if (range == null || range.empty()) - return; - - // We cannot add range in the middle of the file. - if (ranges == null) { - ranges = new ArrayList<>(); - - ranges.add(range); - - return; - } - - assert !ranges.isEmpty(); - - GridGgfsFileAffinityRange last = ranges.get(ranges.size() - 1); - - // Ensure that range being added is located to the right of last range in list. - assert last.greater(range.startOffset()) : "Cannot add range to middle of map [last=" + last + - ", range=" + range + ']'; - - // Try to concat last and new range. - GridGgfsFileAffinityRange concat = last.concat(range); - - // Simply add range to the end of the list if they are not adjacent. - if (concat == null) - ranges.add(range); - else - ranges.set(ranges.size() - 1, concat); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - if (ranges == null) - out.writeInt(-1); - else { - assert !ranges.isEmpty(); - - out.writeInt(ranges.size()); - - for (GridGgfsFileAffinityRange range : ranges) - out.writeObject(range); - } - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - int size = in.readInt(); - - if (size > 0) { - ranges = new ArrayList<>(size); - - for (int i = 0; i < size; i++) - ranges.add((GridGgfsFileAffinityRange)in.readObject()); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsFileMap.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorker.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorker.java deleted file mode 100644 index a70bd9e..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorker.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import java.util.concurrent.*; -import java.util.concurrent.locks.*; - -/** - * GGFS file worker for DUAL modes. - */ -public class GridGgfsFileWorker extends GridGgfsThread { - /** Time during which thread remains alive since it's last batch is finished. */ - private static final long THREAD_REUSE_WAIT_TIME = 5000; - - /** Lock */ - private final Lock lock = new ReentrantLock(); - - /** Condition. */ - private final Condition cond = lock.newCondition(); - - /** Next queued batch. */ - private GridGgfsFileWorkerBatch nextBatch; - - /** Batch which is currently being processed. */ - private GridGgfsFileWorkerBatch curBatch; - - /** Cancellation flag. */ - private volatile boolean cancelled; - - /** - * Creates {@code GGFS} file worker. - * - * @param name Worker name. - */ - GridGgfsFileWorker(String name) { - super(name); - } - - /** - * Add worker batch. - * - * @return {@code True} if the batch was actually added. - */ - boolean addBatch(GridGgfsFileWorkerBatch batch) { - assert batch != null; - - lock.lock(); - - try { - if (!cancelled) { - assert nextBatch == null; // Remember, that write operations on a single file are exclusive. - - nextBatch = batch; - - cond.signalAll(); - - return true; - } - else - return false; - } - finally { - lock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - while (!cancelled) { - lock.lock(); - - try { - // If there are no more new batches, wait for several seconds before shutting down the thread. - if (!cancelled && nextBatch == null) - cond.await(THREAD_REUSE_WAIT_TIME, TimeUnit.MILLISECONDS); - - curBatch = nextBatch; - - nextBatch = null; - - if (cancelled && curBatch != null) - curBatch.finish(); // Mark the batch as finished if cancelled. - } - finally { - lock.unlock(); - } - - if (curBatch != null) - curBatch.process(); - else { - lock.lock(); - - try { - // No more new batches, we can safely release the worker as it was inactive for too long. - if (nextBatch == null) - cancelled = true; - } - finally { - lock.unlock(); - } - } - } - } - - /** {@inheritDoc} */ - @Override protected void cleanup() { - assert cancelled; // Cleanup can only be performed on a cancelled worker. - - // Clear interrupted flag. - boolean interrupted = interrupted(); - - // Process the last batch if any. - if (nextBatch != null) - nextBatch.process(); - - onFinish(); - - // Reset interrupted flag. - if (interrupted) - interrupt(); - } - - /** - * Forcefully finish execution of all batches. - */ - void cancel() { - lock.lock(); - - try { - cancelled = true; - - if (curBatch != null) - curBatch.finish(); - - if (nextBatch != null) - nextBatch.finish(); - - cond.signalAll(); // Awake the main loop in case it is still waiting for the next batch. - } - finally { - lock.unlock(); - } - } - - /** - * Get current batch. - * - * @return Current batch. - */ - GridGgfsFileWorkerBatch currentBatch() { - lock.lock(); - - try { - return nextBatch == null ? curBatch : nextBatch; - } - finally { - lock.unlock(); - } - } - - /** - * Callback invoked when worker has processed all it's batches. - */ - protected void onFinish() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorkerBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorkerBatch.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorkerBatch.java deleted file mode 100644 index 2e71db9..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorkerBatch.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -/** - * Work batch is an abstraction of the logically grouped tasks. - */ -public class GridGgfsFileWorkerBatch { - /** Completion latch. */ - private final CountDownLatch completeLatch = new CountDownLatch(1); - - /** Finish guard. */ - private final AtomicBoolean finishGuard = new AtomicBoolean(); - - /** Lock for finish operation. */ - private final ReadWriteLock finishLock = new ReentrantReadWriteLock(); - - /** Tasks queue. */ - private final BlockingDeque<GridGgfsFileWorkerTask> queue = new LinkedBlockingDeque<>(); - - /** Path to the file in the primary file system. */ - private final IgniteFsPath path; - - /** Output stream to the file. */ - private final OutputStream out; - - /** Caught exception. */ - private volatile IgniteCheckedException err; - - /** Last task marker. */ - private boolean lastTask; - - /** - * Constructor. - * - * @param path Path to the file in the primary file system. - * @param out Output stream opened to that file. - */ - GridGgfsFileWorkerBatch(IgniteFsPath path, OutputStream out) { - assert path != null; - assert out != null; - - this.path = path; - this.out = out; - } - - /** - * Perform write. - * - * @param data Data to be written. - * @return {@code True} in case operation was enqueued. - */ - boolean write(final byte[] data) { - return addTask(new GridGgfsFileWorkerTask() { - @Override public void execute() throws IgniteCheckedException { - try { - out.write(data); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to write data to the file due to secondary file system " + - "exception: " + path, e); - } - } - }); - } - - /** - * Process the batch. - */ - void process() { - try { - boolean cancelled = false; - - while (!cancelled) { - try { - GridGgfsFileWorkerTask task = queue.poll(1000, TimeUnit.MILLISECONDS); - - if (task == null) - continue; - - task.execute(); - - if (lastTask) - cancelled = true; - } - catch (IgniteCheckedException e) { - err = e; - - cancelled = true; - } - catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - - cancelled = true; - } - } - } - finally { - try { - onComplete(); - } - finally { - U.closeQuiet(out); - - completeLatch.countDown(); - } - } - } - - /** - * Add the last task to that batch which will release all the resources. - */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - void finish() { - if (finishGuard.compareAndSet(false, true)) { - finishLock.writeLock().lock(); - - try { - queue.add(new GridGgfsFileWorkerTask() { - @Override public void execute() { - assert queue.isEmpty(); - - lastTask = true; - } - }); - } - finally { - finishLock.writeLock().unlock(); - } - } - } - - /** - * Await for that worker batch to complete. - * - * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing. - */ - void await() throws IgniteCheckedException { - try { - completeLatch.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedException(e); - } - - IgniteCheckedException err0 = err; - - if (err0 != null) - throw err0; - } - - /** - * Await for that worker batch to complete in case it was marked as finished. - * - * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing. - */ - void awaitIfFinished() throws IgniteCheckedException { - if (finishGuard.get()) - await(); - } - - /** - * Get primary file system path. - * - * @return Primary file system path. - */ - IgniteFsPath path() { - return path; - } - - /** - * Callback invoked when all the tasks within the batch are completed. - */ - protected void onComplete() { - // No-op. - } - - /** - * Add task to the queue. - * - * @param task Task to add. - * @return {@code True} in case the task was added to the queue. - */ - private boolean addTask(GridGgfsFileWorkerTask task) { - finishLock.readLock().lock(); - - try { - if (!finishGuard.get()) { - try { - queue.put(task); - - return true; - } - catch (InterruptedException ignore) { - // Task was not enqueued due to interruption. - Thread.currentThread().interrupt(); - - return false; - } - } - else - return false; - - } - finally { - finishLock.readLock().unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorkerTask.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorkerTask.java deleted file mode 100644 index b194864..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileWorkerTask.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; - -/** - * Generic GGFS worker task which could potentially throw an exception. - */ -public interface GridGgfsFileWorkerTask { - /** - * Execute task logic. - * - * @throws IgniteCheckedException If failed. - */ - public void execute() throws IgniteCheckedException; -}
