http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java new file mode 100644 index 0000000..7ef8a73 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.future.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.GridTopic.*; + +/** + * GGFS worker for removal from the trash directory. + */ +public class GridGgfsDeleteWorker extends GridGgfsThread { + /** Awake frequency, */ + private static final long FREQUENCY = 1000; + + /** How many files/folders to delete at once (i.e in a single transaction). */ + private static final int MAX_DELETE_BATCH = 100; + + /** GGFS context. */ + private final GridGgfsContext ggfsCtx; + + /** Metadata manager. */ + private final GridGgfsMetaManager meta; + + /** Data manager. */ + private final GridGgfsDataManager data; + + /** Event manager. */ + private final GridEventStorageManager evts; + + /** Logger. */ + private final IgniteLogger log; + + /** Lock. */ + private final Lock lock = new ReentrantLock(); + + /** Condition. */ + private final Condition cond = lock.newCondition(); + + /** Force worker to perform actual delete. */ + private boolean force; + + /** Cancellation flag. */ + private volatile boolean cancelled; + + /** Message topic. */ + private Object topic; + + /** + * Constructor. + * + * @param ggfsCtx GGFS context. + */ + GridGgfsDeleteWorker(GridGgfsContext ggfsCtx) { + super("ggfs-delete-worker%" + ggfsCtx.ggfs().name() + "%" + ggfsCtx.kernalContext().localNodeId() + "%"); + + this.ggfsCtx = ggfsCtx; + + meta = ggfsCtx.meta(); + data = ggfsCtx.data(); + + evts = ggfsCtx.kernalContext().event(); + + String ggfsName = ggfsCtx.ggfs().name(); + + topic = F.isEmpty(ggfsName) ? TOPIC_GGFS : TOPIC_GGFS.topic(ggfsName); + + assert meta != null; + assert data != null; + + log = ggfsCtx.kernalContext().log(GridGgfsDeleteWorker.class); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Delete worker started."); + + while (!cancelled) { + lock.lock(); + + try { + if (!cancelled && !force) + cond.await(FREQUENCY, TimeUnit.MILLISECONDS); + + force = false; // Reset force flag. + } + finally { + lock.unlock(); + } + + if (!cancelled) + delete(); + } + } + + /** + * Notify the worker that new entry to delete appeared. + */ + void signal() { + lock.lock(); + + try { + force = true; + + cond.signalAll(); + } + finally { + lock.unlock(); + } + } + + void cancel() { + cancelled = true; + + interrupt(); + } + + /** + * Perform cleanup of the trash directory. + */ + private void delete() { + GridGgfsFileInfo info = null; + + try { + info = meta.info(TRASH_ID); + } + catch (IgniteCheckedException e) { + U.error(log, "Cannot obtain trash directory info.", e); + } + + if (info != null) { + for (Map.Entry<String, GridGgfsListingEntry> entry : info.listing().entrySet()) { + IgniteUuid fileId = entry.getValue().fileId(); + + if (log.isDebugEnabled()) + log.debug("Deleting GGFS trash entry [name=" + entry.getKey() + ", fileId=" + fileId + ']'); + + try { + if (!cancelled) { + if (delete(entry.getKey(), fileId)) { + if (log.isDebugEnabled()) + log.debug("Sending delete confirmation message [name=" + entry.getKey() + + ", fileId=" + fileId + ']'); + + sendDeleteMessage(new GridGgfsDeleteMessage(fileId)); + } + } + else + break; + } + catch (IgniteInterruptedException ignored) { + // Ignore this exception while stopping. + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e); + + sendDeleteMessage(new GridGgfsDeleteMessage(fileId, e)); + } + } + } + } + + /** + * Remove particular entry from the TRASH directory. + * + * @param name Entry name. + * @param id Entry ID. + * @return {@code True} in case the entry really was deleted form the file system by this call. + * @throws IgniteCheckedException If failed. + */ + private boolean delete(String name, IgniteUuid id) throws IgniteCheckedException { + assert name != null; + assert id != null; + + while (true) { + GridGgfsFileInfo info = meta.info(id); + + if (info != null) { + if (info.isDirectory()) { + deleteDirectory(TRASH_ID, id); + + if (meta.delete(TRASH_ID, name, id)) + return true; + } + else { + assert info.isFile(); + + // Delete file content first. + // In case this node crashes, other node will re-delete the file. + data.delete(info).get(); + + boolean ret = meta.delete(TRASH_ID, name, id); + + if (evts.isRecordable(EVT_GGFS_FILE_PURGED)) { + if (info.path() != null) + evts.record(new IgniteFsEvent(info.path(), + ggfsCtx.kernalContext().discovery().localNode(), EVT_GGFS_FILE_PURGED)); + else + LT.warn(log, null, "Removing file without path info: " + info); + } + + return ret; + } + } + else + return false; // Entry was deleted concurrently. + } + } + + /** + * Remove particular entry from the trash directory or subdirectory. + * + * @param parentId Parent ID. + * @param id Entry id. + * @throws IgniteCheckedException If delete failed for some reason. + */ + private void deleteDirectory(IgniteUuid parentId, IgniteUuid id) throws IgniteCheckedException { + assert parentId != null; + assert id != null; + + while (true) { + GridGgfsFileInfo info = meta.info(id); + + if (info != null) { + assert info.isDirectory(); + + Map<String, GridGgfsListingEntry> listing = info.listing(); + + if (listing.isEmpty()) + return; // Directory is empty. + + Map<String, GridGgfsListingEntry> delListing; + + if (listing.size() <= MAX_DELETE_BATCH) + delListing = listing; + else { + delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f); + + int i = 0; + + for (Map.Entry<String, GridGgfsListingEntry> entry : listing.entrySet()) { + delListing.put(entry.getKey(), entry.getValue()); + + if (++i == MAX_DELETE_BATCH) + break; + } + } + + GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>(ggfsCtx.kernalContext()); + + // Delegate to child folders. + for (GridGgfsListingEntry entry : delListing.values()) { + if (!cancelled) { + if (entry.isDirectory()) + deleteDirectory(id, entry.fileId()); + else { + GridGgfsFileInfo fileInfo = meta.info(entry.fileId()); + + if (fileInfo != null) { + assert fileInfo.isFile(); + + fut.add(data.delete(fileInfo)); + } + } + } + else + return; + } + + fut.markInitialized(); + + // Wait for data cache to delete values before clearing meta cache. + try { + fut.get(); + } + catch (IgniteFutureCancelledException ignore) { + // This future can be cancelled only due to GGFS shutdown. + cancelled = true; + + return; + } + + // Actual delete of folder content. + Collection<IgniteUuid> delIds = meta.delete(id, delListing); + + if (delListing == listing && delListing.size() == delIds.size()) + break; // All entries were deleted. + } + else + break; // Entry was deleted concurrently. + } + } + + /** + * Send delete message to all meta cache nodes in the grid. + * + * @param msg Message to send. + */ + private void sendDeleteMessage(GridGgfsDeleteMessage msg) { + assert msg != null; + + Collection<ClusterNode> nodes = meta.metaCacheNodes(); + + boolean first = true; + + for (ClusterNode node : nodes) { + GridGgfsCommunicationMessage msg0 = first ? msg : (GridGgfsCommunicationMessage)msg.clone(); + + first = false; + + try { + ggfsCtx.send(node, topic, msg0, GridIoPolicy.SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send GGFS delete message to node [nodeId=" + node.id() + + ", msg=" + msg + ", err=" + e.getMessage() + ']'); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDirectoryNotEmptyException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDirectoryNotEmptyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDirectoryNotEmptyException.java new file mode 100644 index 0000000..e60f39f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDirectoryNotEmptyException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.fs.*; + +/** + * Exception indicating that directory can not be deleted because it is not empty. + */ +public class GridGgfsDirectoryNotEmptyException extends IgniteFsException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param msg Exception message. + */ + public GridGgfsDirectoryNotEmptyException(String msg) { + super(msg); + } + + /** + * Creates an instance of GGFS exception caused by nested exception. + * + * @param cause Exception cause. + */ + public GridGgfsDirectoryNotEmptyException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsEx.java new file mode 100644 index 0000000..154a641 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsEx.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.net.*; + +/** + * Internal API extension for {@link org.apache.ignite.IgniteFs}. + */ +public interface GridGgfsEx extends IgniteFs { + /** + * Stops GGFS cleaning all used resources. + */ + public void stop(); + + /** + * @return GGFS context. + */ + public GridGgfsContext context(); + + /** + * Get handshake message. + * + * @return Handshake message. + */ + public GridGgfsPaths proxyPaths(); + + /** {@inheritDoc} */ + @Override GridGgfsInputStreamAdapter open(IgniteFsPath path, int bufSize, int seqReadsBeforePrefetch) + throws IgniteCheckedException; + + /** {@inheritDoc} */ + @Override GridGgfsInputStreamAdapter open(IgniteFsPath path) throws IgniteCheckedException; + + /** {@inheritDoc} */ + @Override GridGgfsInputStreamAdapter open(IgniteFsPath path, int bufSize) throws IgniteCheckedException; + + /** + * Gets global space counters. + * + * @return Tuple in which first component is used space on all nodes, + * second is available space on all nodes. + * @throws IgniteCheckedException If task execution failed. + */ + public GridGgfsStatus globalSpace() throws IgniteCheckedException; + + /** + * Enables, disables or clears sampling flag. + * + * @param val {@code True} to turn on sampling, {@code false} to turn it off, {@code null} to clear sampling state. + * @throws IgniteCheckedException If failed. + */ + public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException; + + /** + * Get sampling state. + * + * @return {@code True} in case sampling is enabled, {@code false} otherwise, or {@code null} in case sampling + * flag is not set. + */ + @Nullable public Boolean globalSampling(); + + /** + * Get local metrics. + * + * @return Local metrics. + */ + public GridGgfsLocalMetrics localMetrics(); + + /** + * Gets group block size, i.e. block size multiplied by group size in affinity mapper. + * + * @return Group block size. + */ + public long groupBlockSize(); + + /** + * Asynchronously await for all entries existing in trash to be removed. + * + * @return Future which will be completed when all entries existed in trash by the time of invocation are removed. + * @throws IgniteCheckedException If failed. + */ + public IgniteFuture<?> awaitDeletesAsync() throws IgniteCheckedException; + + /** + * Gets client file system log directory. + * + * @return Client file system log directory or {@code null} in case no client connections have been created yet. + */ + @Nullable public String clientLogDirectory(); + + /** + * Sets client file system log directory. + * + * @param logDir Client file system log directory. + */ + public void clientLogDirectory(String logDir); + + /** + * Whether this path is excluded from evictions. + * + * @param path Path. + * @param primary Whether the mode is PRIMARY. + * @return {@code True} if path is excluded from evictions. + */ + public boolean evictExclude(IgniteFsPath path, boolean primary); + + /** + * Get next affinity key. + * + * @return Next affinity key. + */ + public IgniteUuid nextAffinityKey(); + + /** + * Check whether the given path is proxy path. + * + * @param path Path. + * @return {@code True} if proxy. + */ + public boolean isProxy(URI path); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileAffinityRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileAffinityRange.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileAffinityRange.java new file mode 100644 index 0000000..3474ac0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileAffinityRange.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Affinity range. + */ +public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapter implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Initial range status, right after creation. */ + public static final int RANGE_STATUS_INITIAL = 0; + + /** Moving range state. Fragmentizer started blocks copy. */ + public static final int RANGE_STATUS_MOVING = 1; + + /** Fragmentizer finished block copy for this range. */ + public static final int RANGE_STATUS_MOVED = 2; + + /** Range affinity key. */ + private IgniteUuid affKey; + + /** {@code True} if currently being moved by fragmentizer. */ + @SuppressWarnings("RedundantFieldInitialization") + private int status = RANGE_STATUS_INITIAL; + + /** Range start offset (divisible by block size). */ + private long startOff; + + /** Range end offset (endOff + 1 divisible by block size). */ + private long endOff; + + /** Transient flag indicating no further writes should be made to this range. */ + private boolean done; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridGgfsFileAffinityRange() { + // No-op. + } + + /** + * @param startOff Start offset. + * @param endOff End offset. + * @param affKey Affinity key. + */ + GridGgfsFileAffinityRange(long startOff, long endOff, IgniteUuid affKey) { + this.startOff = startOff; + this.endOff = endOff; + this.affKey = affKey; + } + + /** + * Creates new range with updated status. + * + * @param other Initial range. + * @param status Updated status. + */ + GridGgfsFileAffinityRange(GridGgfsFileAffinityRange other, int status) { + startOff = other.startOff; + endOff = other.endOff; + affKey = other.affKey; + + this.status = status; + } + + /** + * @return Affinity key for this range. + */ + public IgniteUuid affinityKey() { + return affKey; + } + + /** + * @return Range start offset. + */ + public long startOffset() { + return startOff; + } + + /** + * @return Range end offset. + */ + public long endOffset() { + return endOff; + } + + /** + * @param blockStartOff Block start offset to check. + * @return {@code True} if block with given start offset belongs to this range. + */ + public boolean belongs(long blockStartOff) { + return blockStartOff >= startOff && blockStartOff < endOff; + } + + /** + * @param blockStartOff Block start offset to check. + * @return {@code True} if block with given start offset is located before this range. + */ + public boolean less(long blockStartOff) { + return blockStartOff < startOff; + } + + /** + * @param blockStartOff Block start offset to check. + * @return {@code True} if block with given start offset is located after this range. + */ + public boolean greater(long blockStartOff) { + return blockStartOff > endOff; + } + + /** + * @return If range is empty, i.e. has zero length. + */ + public boolean empty() { + return startOff == endOff; + } + + /** + * @return Range status. + */ + public int status() { + return status; + } + + /** + * Expands this range by given block. + * + * @param blockStartOff Offset of block start. + * @param expansionSize Block size. + */ + public void expand(long blockStartOff, int expansionSize) { + // If we are expanding empty range. + if (endOff == startOff) { + assert endOff == blockStartOff : "Failed to expand range [endOff=" + endOff + + ", blockStartOff=" + blockStartOff + ", expansionSize=" + expansionSize + ']'; + + endOff += expansionSize - 1; + } + else { + assert endOff == blockStartOff - 1; + + endOff += expansionSize; + } + } + + /** + * Splits range into collection if smaller ranges with length equal to {@code maxSize}. + * + * @param maxSize Split part maximum size. + * @return Collection of range parts. + */ + public Collection<GridGgfsFileAffinityRange> split(long maxSize) { + long len = endOff - startOff + 1; + + if (len > maxSize) { + int size = (int)(len / maxSize + 1); + + Collection<GridGgfsFileAffinityRange> res = new ArrayList<>(size); + + long pos = startOff; + + while (pos < endOff + 1) { + long end = Math.min(pos + maxSize - 1, endOff); + + GridGgfsFileAffinityRange part = new GridGgfsFileAffinityRange(pos, end, affKey); + + part.status = status; + + res.add(part); + + pos = end + 1; + } + + return res; + } + else + return Collections.singletonList(this); + } + + /** + * Tries to concatenate this range with a given one. If ranges are not adjacent, will return {@code null}. + * + * @param range Range to concatenate with. + * @return Concatenation result or {@code null} if ranges are not adjacent. + */ + @Nullable public GridGgfsFileAffinityRange concat(GridGgfsFileAffinityRange range) { + if (endOff + 1 != range.startOff || !F.eq(affKey, range.affKey) || status != RANGE_STATUS_INITIAL) + return null; + + return new GridGgfsFileAffinityRange(startOff, range.endOff, affKey); + } + + /** + * Marks this range as done. + */ + public void markDone() { + done = true; + } + + /** + * @return Done flag. + */ + public boolean done() { + return done; + } + + /** + * Checks if range regions are equal. + * + * @param other Other range to check against. + * @return {@code True} if range regions are equal. + */ + public boolean regionEqual(GridGgfsFileAffinityRange other) { + return startOff == other.startOff && endOff == other.endOff; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, affKey); + + out.writeInt(status); + + out.writeLong(startOff); + out.writeLong(endOff); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + affKey = U.readGridUuid(in); + + status = in.readInt(); + + startOff = in.readLong(); + endOff = in.readLong(); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsFileAffinityRange _clone = new GridGgfsFileAffinityRange(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridGgfsFileAffinityRange _clone = (GridGgfsFileAffinityRange)_msg; + + _clone.affKey = affKey; + _clone.status = status; + _clone.startOff = startOff; + _clone.endOff = endOff; + _clone.done = done; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putGridUuid(affKey)) + return false; + + commState.idx++; + + case 1: + if (!commState.putBoolean(done)) + return false; + + commState.idx++; + + case 2: + if (!commState.putLong(endOff)) + return false; + + commState.idx++; + + case 3: + if (!commState.putLong(startOff)) + return false; + + commState.idx++; + + case 4: + if (!commState.putInt(status)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: + IgniteUuid affKey0 = commState.getGridUuid(); + + if (affKey0 == GRID_UUID_NOT_READ) + return false; + + affKey = affKey0; + + commState.idx++; + + case 1: + if (buf.remaining() < 1) + return false; + + done = commState.getBoolean(); + + commState.idx++; + + case 2: + if (buf.remaining() < 8) + return false; + + endOff = commState.getLong(); + + commState.idx++; + + case 3: + if (buf.remaining() < 8) + return false; + + startOff = commState.getLong(); + + commState.idx++; + + case 4: + if (buf.remaining() < 4) + return false; + + status = commState.getInt(); + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 69; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsFileAffinityRange.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileInfo.java new file mode 100644 index 0000000..f4c61e5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileInfo.java @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Unmodifiable file information. + */ +public final class GridGgfsFileInfo implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** ID for the root directory. */ + public static final IgniteUuid ROOT_ID = new IgniteUuid(new UUID(0, 0), 0); + + /** ID of the trash directory. */ + public static final IgniteUuid TRASH_ID = new IgniteUuid(new UUID(0, 1), 0); + + /** Info ID. */ + private IgniteUuid id; + + /** File length in bytes. */ + private long len; + + /** File block size, {@code zero} for directories. */ + private int blockSize; + + /** File properties. */ + private Map<String, String> props; + + /** File lock ID. */ + private IgniteUuid lockId; + + /** Affinity key used for single-node file collocation. */ + private IgniteUuid affKey; + + /** File affinity map. */ + private GridGgfsFileMap fileMap; + + /** Last access time. Modified on-demand. */ + private long accessTime; + + /** Last modification time. */ + private long modificationTime; + + /** Directory listing. */ + @GridToStringInclude + private Map<String, GridGgfsListingEntry> listing; + + /** Whether data blocks of this entry should never be excluded. */ + private boolean evictExclude; + + /** + * Original file path. This is a helper field used only in some + * operations like delete. + */ + private IgniteFsPath path; + + /** + * {@link Externalizable} support. + */ + public GridGgfsFileInfo() { + this(ROOT_ID); + } + + /** + * Constructs directory file info with the given ID. + * + * @param id ID. + */ + GridGgfsFileInfo(IgniteUuid id) { + this(true, id, 0, 0, null, null, null, null, false, System.currentTimeMillis(), false); + } + + /** + * Constructs directory or file info with {@link org.apache.ignite.fs.IgniteFsConfiguration#DFLT_BLOCK_SIZE default} block size. + * + * @param isDir Constructs directory info if {@code true} or file info if {@code false}. + * @param props Meta properties to set. + */ + public GridGgfsFileInfo(boolean isDir, @Nullable Map<String, String> props) { + this(isDir, null, isDir ? 0 : IgniteFsConfiguration.DFLT_BLOCK_SIZE, 0, null, null, props, null, false, + System.currentTimeMillis(), false); + } + + /** + * Consturcts directory with random ID and provided listing. + * + * @param listing Listing. + */ + GridGgfsFileInfo(Map<String, GridGgfsListingEntry> listing) { + this(true, null, 0, 0, null, listing, null, null, false, System.currentTimeMillis(), false); + } + + /** + * Constructs file info. + * + * @param blockSize Block size. + * @param affKey Affinity key. + * @param evictExclude Eviction exclude flag. + * @param props File properties. + */ + GridGgfsFileInfo(int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude, + @Nullable Map<String, String> props) { + this(false, null, blockSize, 0, affKey, null, props, null, true, System.currentTimeMillis(), evictExclude); + } + + /** + * Constructs file info. + * + * @param blockSize Block size. + * @param len Length. + * @param affKey Affinity key. + * @param lockId Lock ID. + * @param props Properties. + * @param evictExclude Evict exclude flag. + */ + public GridGgfsFileInfo(int blockSize, long len, @Nullable IgniteUuid affKey, @Nullable IgniteUuid lockId, + boolean evictExclude, @Nullable Map<String, String> props) { + this(false, null, blockSize, len, affKey, null, props, lockId, true, System.currentTimeMillis(), evictExclude); + } + + /** + * Constructs file information. + * + * @param info File information to copy data from. + * @param len Size of a file. + */ + GridGgfsFileInfo(GridGgfsFileInfo info, long len) { + this(info.isDirectory(), info.id, info.blockSize, len, info.affKey, info.listing, info.props, info.fileMap(), + info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); + } + + /** + * Constructs file info. + * + * @param info File info. + * @param accessTime Last access time. + * @param modificationTime Last modification time. + */ + GridGgfsFileInfo(GridGgfsFileInfo info, long accessTime, long modificationTime) { + this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, + info.fileMap(), info.lockId, false, accessTime, modificationTime, info.evictExclude()); + } + + /** + * Constructs file information. + * + * @param info File information to copy data from. + * @param props File properties to set. + */ + GridGgfsFileInfo(GridGgfsFileInfo info, @Nullable Map<String, String> props) { + this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, props, + info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); + } + + /** + * Constructs file info. + * + * @param blockSize Block size, + * @param len Size of a file. + * @param props File properties to set. + * @param evictExclude Evict exclude flag. + */ + GridGgfsFileInfo(int blockSize, long len, boolean evictExclude, @Nullable Map<String, String> props) { + this(false, null, blockSize, len, null, null, props, null, true, System.currentTimeMillis(), evictExclude); + } + + /** + * Constructs file information. + * + * @param info File information to copy data from. + * @param lockId Lock ID. + * @param modificationTime Last modification time. + */ + GridGgfsFileInfo(GridGgfsFileInfo info, @Nullable IgniteUuid lockId, long modificationTime) { + this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, + info.fileMap(), lockId, true, info.accessTime, modificationTime, info.evictExclude()); + } + + /** + * Constructs file info. + * + * @param listing New directory listing. + * @param old Old file info. + */ + GridGgfsFileInfo(Map<String, GridGgfsListingEntry> listing, GridGgfsFileInfo old) { + this(old.isDirectory(), old.id, old.blockSize, old.len, old.affKey, listing, old.props, old.fileMap(), + old.lockId, false, old.accessTime, old.modificationTime, old.evictExclude()); + } + + /** + * Constructs file info. + * + * @param isDir Constructs directory info if {@code true} or file info if {@code false}. + * @param id ID or {@code null} to generate it automatically. + * @param blockSize Block size. + * @param len Size of a file. + * @param affKey Affinity key for data blocks. + * @param listing Directory listing. + * @param props File properties. + * @param lockId Lock ID. + * @param cpProps Flag to copy properties map. + * @param modificationTime Last modification time. + * @param evictExclude Evict exclude flag. + */ + private GridGgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey, + @Nullable Map<String, GridGgfsListingEntry> listing, @Nullable Map<String, String> props, + @Nullable IgniteUuid lockId, boolean cpProps, long modificationTime, boolean evictExclude) { + this(isDir, id, blockSize, len, affKey, listing, props, null, lockId, cpProps, modificationTime, + modificationTime, evictExclude); + } + + /** + * Constructs file info. + * + * @param isDir Constructs directory info if {@code true} or file info if {@code false}. + * @param id ID or {@code null} to generate it automatically. + * @param blockSize Block size. + * @param len Size of a file. + * @param affKey Affinity key for data blocks. + * @param listing Directory listing. + * @param props File properties. + * @param fileMap File map. + * @param lockId Lock ID. + * @param cpProps Flag to copy properties map. + * @param accessTime Last access time. + * @param modificationTime Last modification time. + * @param evictExclude Evict exclude flag. + */ + private GridGgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey, + @Nullable Map<String, GridGgfsListingEntry> listing, @Nullable Map<String, String> props, + @Nullable GridGgfsFileMap fileMap, @Nullable IgniteUuid lockId, boolean cpProps, long accessTime, + long modificationTime, boolean evictExclude) { + assert F.isEmpty(listing) || isDir; + + if (isDir) { + assert len == 0 : "Directory length should be zero: " + len; + assert blockSize == 0 : "Directory block size should be zero: " + blockSize; + } + else { + assert len >= 0 : "File length cannot be negative: " + len; + assert blockSize > 0 : "File block size should be positive: " + blockSize; + } + + this.id = id == null ? IgniteUuid.randomUuid() : id; + this.len = isDir ? 0 : len; + this.blockSize = isDir ? 0 : blockSize; + this.affKey = affKey; + this.listing = listing; + + if (fileMap == null && !isDir) + fileMap = new GridGgfsFileMap(); + + this.fileMap = fileMap; + this.accessTime = accessTime; + this.modificationTime = modificationTime; + + // Always make a copy of passed properties collection to escape concurrent modifications. + this.props = props == null || props.isEmpty() ? null : + cpProps ? new GridLeanMap<>(props) : props; + + if (listing == null && isDir) + this.listing = Collections.emptyMap(); + + this.lockId = lockId; + this.evictExclude = evictExclude; + } + + /** + * A copy constructor, which takes all data from the specified + * object field-by-field. + * + * @param info An object to copy data info. + */ + public GridGgfsFileInfo(GridGgfsFileInfo info) { + this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, + info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); + } + + /** + * Creates a builder for the new instance of file info. + * + * @return A builder to construct a new unmodifiable instance + * of this class. + */ + public static Builder builder() { + return new Builder(new GridGgfsFileInfo()); + } + + /** + * Creates a builder for the new instance of file info, + * based on the specified origin. + * + * @param origin An origin for new instance, from which + * the data will be copied. + * @return A builder to construct a new unmodifiable instance + * of this class. + */ + public static Builder builder(GridGgfsFileInfo origin) { + return new Builder(new GridGgfsFileInfo(origin)); + } + + /** + * Gets this item ID. + * + * @return This item ID. + */ + public IgniteUuid id() { + return id; + } + + /** + * @return {@code True} if this is a file. + */ + public boolean isFile() { + return blockSize > 0; + } + + /** + * @return {@code True} if this is a directory. + */ + public boolean isDirectory() { + return blockSize == 0; + } + + /** + * Get file size. + * + * @return File size. + */ + public long length() { + assert isFile(); + + return len; + } + + /** + * Get single data block size to store this file. + * + * @return Single data block size to store this file. + */ + public int blockSize() { + assert isFile(); + + return blockSize; + } + + /** + * @return Number of data blocks to store this file. + */ + public long blocksCount() { + assert isFile(); + + return (len + blockSize() - 1) / blockSize(); + } + + /** + * @return Last access time. + */ + public long accessTime() { + return accessTime; + } + + /** + * @return Last modification time. + */ + public long modificationTime() { + return modificationTime; + } + + /** + * @return Directory listing. + */ + public Map<String, GridGgfsListingEntry> listing() { + // Always wrap into unmodifiable map to be able to avoid illegal modifications in order pieces of the code. + if (isFile()) + return Collections.unmodifiableMap(Collections.<String, GridGgfsListingEntry>emptyMap()); + + assert listing != null; + + return Collections.unmodifiableMap(listing); + } + + /** + * @return Affinity key used for single-node file collocation. If {@code null}, usual + * mapper procedure is used for block affinity detection. + */ + @Nullable public IgniteUuid affinityKey() { + return affKey; + } + + /** + * @param affKey Affinity key used for single-node file collocation. + */ + public void affinityKey(IgniteUuid affKey) { + this.affKey = affKey; + } + + /** + * @return File affinity map. + */ + public GridGgfsFileMap fileMap() { + return fileMap; + } + + /** + * @param fileMap File affinity map. + */ + public void fileMap(GridGgfsFileMap fileMap) { + this.fileMap = fileMap; + } + + /** + * Get properties of the file. + * + * @return Properties of the file. + */ + public Map<String, String> properties() { + return props == null || props.isEmpty() ? Collections.<String, String>emptyMap() : + Collections.unmodifiableMap(props); + } + + /** + * Get lock ID. + * + * @return Lock ID if file is locked or {@code null} if file is free of locks. + */ + @Nullable public IgniteUuid lockId() { + return lockId; + } + + /** + * Get evict exclude flag. + * + * @return Evict exclude flag. + */ + public boolean evictExclude() { + return evictExclude; + } + + /** + * @return Original file path. This is a helper field used only in some operations like delete. + */ + public IgniteFsPath path() { + return path; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, id); + out.writeInt(blockSize); + out.writeLong(len); + U.writeStringMap(out, props); + U.writeGridUuid(out, lockId); + U.writeGridUuid(out, affKey); + out.writeObject(listing); + out.writeObject(fileMap); + out.writeLong(accessTime); + out.writeLong(modificationTime); + out.writeBoolean(evictExclude); + out.writeObject(path); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = U.readGridUuid(in); + blockSize = in.readInt(); + len = in.readLong(); + props = U.readStringMap(in); + lockId = U.readGridUuid(in); + affKey = U.readGridUuid(in); + listing = (Map<String, GridGgfsListingEntry>)in.readObject(); + fileMap = (GridGgfsFileMap)in.readObject(); + accessTime = in.readLong(); + modificationTime = in.readLong(); + evictExclude = in.readBoolean(); + path = (IgniteFsPath)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id.hashCode() ^ blockSize ^ (int)(len ^ (len >>> 32)) ^ (props == null ? 0 : props.hashCode()) ^ + (lockId == null ? 0 : lockId.hashCode()); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + GridGgfsFileInfo that = (GridGgfsFileInfo)obj; + + return id.equals(that.id) && blockSize == that.blockSize && len == that.len && F.eq(affKey, that.affKey) && + F.eq(props, that.props) && F.eq(lockId, that.lockId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsFileInfo.class, this); + } + + /** + * Builder for {@link GridGgfsFileInfo}. + */ + @SuppressWarnings("PublicInnerClass") + public static class Builder { + /** Instance to build. */ + private final GridGgfsFileInfo info; + + /** + * Private constructor. + * + * @param info Instance to build. + */ + private Builder(GridGgfsFileInfo info) { + this.info = info; + } + + /** + * @param path A new path value. + * @return This builder instance (for chaining). + */ + public Builder path(IgniteFsPath path) { + info.path = path; + + return this; + } + + /** + * Finishes instance construction and returns a resulting + * unmodifiable instance. + * + * @return A constructed instance. + */ + public GridGgfsFileInfo build() { + return info; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileMap.java new file mode 100644 index 0000000..248e37c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileMap.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Auxiliary class that is responsible for managing file affinity keys allocation by ranges. + */ +public class GridGgfsFileMap implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + @GridToStringInclude + /** Sorted list of ranges in ascending order. */ + private List<GridGgfsFileAffinityRange> ranges; + + /** + * Empty constructor. + */ + public GridGgfsFileMap() { + // No-op. + } + + /** + * Constructs same file map as passed in. + * + * @param old Old map. + */ + public GridGgfsFileMap(@Nullable GridGgfsFileMap old) { + if (old != null && old.ranges != null) { + ranges = new ArrayList<>(old.ranges.size()); + + ranges.addAll(old.ranges); + } + } + + /** + * Gets affinity key from file map based on block start offset. + * + * @param blockOff Block start offset (divisible by block size). + * @param includeMoved If {@code true} then will return affinity key for ranges marked as moved. + * Otherwise will return null for such ranges. + * @return Affinity key. + */ + public IgniteUuid affinityKey(long blockOff, boolean includeMoved) { + if (ranges == null) + return null; + + assert !ranges.isEmpty(); + + // Range binary search. + int leftIdx = 0, rightIdx = ranges.size() - 1; + + GridGgfsFileAffinityRange leftRange = ranges.get(leftIdx); + GridGgfsFileAffinityRange rightRange = ranges.get(rightIdx); + + // If block offset is less than start of first range, we don't have affinity key. + if (leftRange.less(blockOff)) + return null; + + if (leftRange.belongs(blockOff)) + return leftRange.status() != RANGE_STATUS_MOVED ? leftRange.affinityKey() : + includeMoved ? leftRange.affinityKey() : null; + + if (rightRange.greater(blockOff)) + return null; + + if (rightRange.belongs(blockOff)) + return rightRange.status() != RANGE_STATUS_MOVED ? rightRange.affinityKey() : + includeMoved ? leftRange.affinityKey() : null; + + while (rightIdx - leftIdx > 1) { + int midIdx = (leftIdx + rightIdx) / 2; + + GridGgfsFileAffinityRange midRange = ranges.get(midIdx); + + if (midRange.belongs(blockOff)) + return midRange.status() != RANGE_STATUS_MOVED ? midRange.affinityKey() : + includeMoved ? leftRange.affinityKey() : null; + + // If offset is less then block start, update right index. + if (midRange.less(blockOff)) + rightIdx = midIdx; + else { + assert midRange.greater(blockOff); + + leftIdx = midIdx; + } + } + + // Range was not found. + return null; + } + + /** + * Updates range status in file map. Will split range into two ranges if given range is a sub-range starting + * from the same offset. + * + * @param range Range to update status. + * @param status New range status. + * @throws IgniteCheckedException If range was not found. + */ + public void updateRangeStatus(GridGgfsFileAffinityRange range, int status) throws IgniteCheckedException { + if (ranges == null) + throw new GridGgfsInvalidRangeException("Failed to update range status (file map is empty) " + + "[range=" + range + ", ranges=" + ranges + ']'); + + assert !ranges.isEmpty(); + + // Check last. + int lastIdx = ranges.size() - 1; + + GridGgfsFileAffinityRange last = ranges.get(lastIdx); + + if (last.startOffset() == range.startOffset()) { + updateRangeStatus0(lastIdx, last, range, status); + + return; + } + + // Check first. + int firstIdx = 0; + + GridGgfsFileAffinityRange first = ranges.get(firstIdx); + + if (first.startOffset() == range.startOffset()) { + updateRangeStatus0(firstIdx, first, range, status); + + return; + } + + // Binary search. + while (lastIdx - firstIdx > 1) { + int midIdx = (firstIdx + lastIdx) / 2; + + GridGgfsFileAffinityRange midRange = ranges.get(midIdx); + + if (midRange.startOffset() == range.startOffset()) { + updateRangeStatus0(midIdx, midRange, range, status); + + return; + } + + // If range we are looking for is less + if (midRange.less(range.startOffset())) + lastIdx = midIdx; + else { + assert midRange.greater(range.startOffset()); + + firstIdx = midIdx; + } + } + + throw new GridGgfsInvalidRangeException("Failed to update map for range (corresponding map range " + + "was not found) [range=" + range + ", status=" + status + ", ranges=" + ranges + ']'); + } + + /** + * Deletes range from map. + * + * @param range Range to delete. + */ + public void deleteRange(GridGgfsFileAffinityRange range) throws IgniteCheckedException { + if (ranges == null) + throw new GridGgfsInvalidRangeException("Failed to remove range (file map is empty) " + + "[range=" + range + ", ranges=" + ranges + ']'); + + assert !ranges.isEmpty(); + + try { + // Check last. + int lastIdx = ranges.size() - 1; + + GridGgfsFileAffinityRange last = ranges.get(lastIdx); + + if (last.regionEqual(range)) { + assert last.status() == RANGE_STATUS_MOVED; + + ranges.remove(last); + + return; + } + + // Check first. + int firstIdx = 0; + + GridGgfsFileAffinityRange first = ranges.get(firstIdx); + + if (first.regionEqual(range)) { + assert first.status() == RANGE_STATUS_MOVED; + + ranges.remove(first); + + return; + } + + // Binary search. + while (lastIdx - firstIdx > 1) { + int midIdx = (firstIdx + lastIdx) / 2; + + GridGgfsFileAffinityRange midRange = ranges.get(midIdx); + + if (midRange.regionEqual(range)) { + assert midRange.status() == RANGE_STATUS_MOVED; + + ranges.remove(midIdx); + + return; + } + + // If range we are looking for is less + if (midRange.less(range.startOffset())) + lastIdx = midIdx; + else { + assert midRange.greater(range.startOffset()); + + firstIdx = midIdx; + } + } + } + finally { + if (ranges.isEmpty()) + ranges = null; + } + + throw new GridGgfsInvalidRangeException("Failed to remove range from file map (corresponding map range " + + "was not found) [range=" + range + ", ranges=" + ranges + ']'); + } + + /** + * Updates range status at given position (will split range into two if necessary). + * + * @param origIdx Original range index. + * @param orig Original range at index. + * @param update Range being updated. + * @param status New status for range. + */ + private void updateRangeStatus0(int origIdx, GridGgfsFileAffinityRange orig, GridGgfsFileAffinityRange update, + int status) { + assert F.eq(orig.affinityKey(), update.affinityKey()); + assert ranges.get(origIdx) == orig; + + if (orig.regionEqual(update)) + ranges.set(origIdx, new GridGgfsFileAffinityRange(update, status)); + else { + // If range was expanded, new one should be larger. + assert orig.endOffset() > update.endOffset(); + + ranges.set(origIdx, new GridGgfsFileAffinityRange(update, status)); + ranges.add(origIdx + 1, new GridGgfsFileAffinityRange(update.endOffset() + 1, orig.endOffset(), + orig.affinityKey())); + } + } + + /** + * Gets full list of ranges present in this map. + * + * @return Unmodifiable list of ranges. + */ + public List<GridGgfsFileAffinityRange> ranges() { + if (ranges == null) + return Collections.emptyList(); + + return Collections.unmodifiableList(ranges); + } + + /** + * Adds range to the list of already existing ranges. Added range must be located after + * the last range in this map. If added range is adjacent to the last range in the map, + * added range will be concatenated to the last one. + * + * @param range Range to add. + */ + public void addRange(GridGgfsFileAffinityRange range) { + if (range == null || range.empty()) + return; + + // We cannot add range in the middle of the file. + if (ranges == null) { + ranges = new ArrayList<>(); + + ranges.add(range); + + return; + } + + assert !ranges.isEmpty(); + + GridGgfsFileAffinityRange last = ranges.get(ranges.size() - 1); + + // Ensure that range being added is located to the right of last range in list. + assert last.greater(range.startOffset()) : "Cannot add range to middle of map [last=" + last + + ", range=" + range + ']'; + + // Try to concat last and new range. + GridGgfsFileAffinityRange concat = last.concat(range); + + // Simply add range to the end of the list if they are not adjacent. + if (concat == null) + ranges.add(range); + else + ranges.set(ranges.size() - 1, concat); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + if (ranges == null) + out.writeInt(-1); + else { + assert !ranges.isEmpty(); + + out.writeInt(ranges.size()); + + for (GridGgfsFileAffinityRange range : ranges) + out.writeObject(range); + } + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + int size = in.readInt(); + + if (size > 0) { + ranges = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + ranges.add((GridGgfsFileAffinityRange)in.readObject()); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsFileMap.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorker.java new file mode 100644 index 0000000..d5875cc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorker.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +/** + * GGFS file worker for DUAL modes. + */ +public class GridGgfsFileWorker extends GridGgfsThread { + /** Time during which thread remains alive since it's last batch is finished. */ + private static final long THREAD_REUSE_WAIT_TIME = 5000; + + /** Lock */ + private final Lock lock = new ReentrantLock(); + + /** Condition. */ + private final Condition cond = lock.newCondition(); + + /** Next queued batch. */ + private GridGgfsFileWorkerBatch nextBatch; + + /** Batch which is currently being processed. */ + private GridGgfsFileWorkerBatch curBatch; + + /** Cancellation flag. */ + private volatile boolean cancelled; + + /** + * Creates {@code GGFS} file worker. + * + * @param name Worker name. + */ + GridGgfsFileWorker(String name) { + super(name); + } + + /** + * Add worker batch. + * + * @return {@code True} if the batch was actually added. + */ + boolean addBatch(GridGgfsFileWorkerBatch batch) { + assert batch != null; + + lock.lock(); + + try { + if (!cancelled) { + assert nextBatch == null; // Remember, that write operations on a single file are exclusive. + + nextBatch = batch; + + cond.signalAll(); + + return true; + } + else + return false; + } + finally { + lock.unlock(); + } + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + while (!cancelled) { + lock.lock(); + + try { + // If there are no more new batches, wait for several seconds before shutting down the thread. + if (!cancelled && nextBatch == null) + cond.await(THREAD_REUSE_WAIT_TIME, TimeUnit.MILLISECONDS); + + curBatch = nextBatch; + + nextBatch = null; + + if (cancelled && curBatch != null) + curBatch.finish(); // Mark the batch as finished if cancelled. + } + finally { + lock.unlock(); + } + + if (curBatch != null) + curBatch.process(); + else { + lock.lock(); + + try { + // No more new batches, we can safely release the worker as it was inactive for too long. + if (nextBatch == null) + cancelled = true; + } + finally { + lock.unlock(); + } + } + } + } + + /** {@inheritDoc} */ + @Override protected void cleanup() { + assert cancelled; // Cleanup can only be performed on a cancelled worker. + + // Clear interrupted flag. + boolean interrupted = interrupted(); + + // Process the last batch if any. + if (nextBatch != null) + nextBatch.process(); + + onFinish(); + + // Reset interrupted flag. + if (interrupted) + interrupt(); + } + + /** + * Forcefully finish execution of all batches. + */ + void cancel() { + lock.lock(); + + try { + cancelled = true; + + if (curBatch != null) + curBatch.finish(); + + if (nextBatch != null) + nextBatch.finish(); + + cond.signalAll(); // Awake the main loop in case it is still waiting for the next batch. + } + finally { + lock.unlock(); + } + } + + /** + * Get current batch. + * + * @return Current batch. + */ + GridGgfsFileWorkerBatch currentBatch() { + lock.lock(); + + try { + return nextBatch == null ? curBatch : nextBatch; + } + finally { + lock.unlock(); + } + } + + /** + * Callback invoked when worker has processed all it's batches. + */ + protected void onFinish() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorkerBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorkerBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorkerBatch.java new file mode 100644 index 0000000..6bfe1c6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorkerBatch.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +/** + * Work batch is an abstraction of the logically grouped tasks. + */ +public class GridGgfsFileWorkerBatch { + /** Completion latch. */ + private final CountDownLatch completeLatch = new CountDownLatch(1); + + /** Finish guard. */ + private final AtomicBoolean finishGuard = new AtomicBoolean(); + + /** Lock for finish operation. */ + private final ReadWriteLock finishLock = new ReentrantReadWriteLock(); + + /** Tasks queue. */ + private final BlockingDeque<GridGgfsFileWorkerTask> queue = new LinkedBlockingDeque<>(); + + /** Path to the file in the primary file system. */ + private final IgniteFsPath path; + + /** Output stream to the file. */ + private final OutputStream out; + + /** Caught exception. */ + private volatile IgniteCheckedException err; + + /** Last task marker. */ + private boolean lastTask; + + /** + * Constructor. + * + * @param path Path to the file in the primary file system. + * @param out Output stream opened to that file. + */ + GridGgfsFileWorkerBatch(IgniteFsPath path, OutputStream out) { + assert path != null; + assert out != null; + + this.path = path; + this.out = out; + } + + /** + * Perform write. + * + * @param data Data to be written. + * @return {@code True} in case operation was enqueued. + */ + boolean write(final byte[] data) { + return addTask(new GridGgfsFileWorkerTask() { + @Override public void execute() throws IgniteCheckedException { + try { + out.write(data); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to write data to the file due to secondary file system " + + "exception: " + path, e); + } + } + }); + } + + /** + * Process the batch. + */ + void process() { + try { + boolean cancelled = false; + + while (!cancelled) { + try { + GridGgfsFileWorkerTask task = queue.poll(1000, TimeUnit.MILLISECONDS); + + if (task == null) + continue; + + task.execute(); + + if (lastTask) + cancelled = true; + } + catch (IgniteCheckedException e) { + err = e; + + cancelled = true; + } + catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + + cancelled = true; + } + } + } + finally { + try { + onComplete(); + } + finally { + U.closeQuiet(out); + + completeLatch.countDown(); + } + } + } + + /** + * Add the last task to that batch which will release all the resources. + */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + void finish() { + if (finishGuard.compareAndSet(false, true)) { + finishLock.writeLock().lock(); + + try { + queue.add(new GridGgfsFileWorkerTask() { + @Override public void execute() { + assert queue.isEmpty(); + + lastTask = true; + } + }); + } + finally { + finishLock.writeLock().unlock(); + } + } + } + + /** + * Await for that worker batch to complete. + * + * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing. + */ + void await() throws IgniteCheckedException { + try { + completeLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } + + IgniteCheckedException err0 = err; + + if (err0 != null) + throw err0; + } + + /** + * Await for that worker batch to complete in case it was marked as finished. + * + * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing. + */ + void awaitIfFinished() throws IgniteCheckedException { + if (finishGuard.get()) + await(); + } + + /** + * Get primary file system path. + * + * @return Primary file system path. + */ + IgniteFsPath path() { + return path; + } + + /** + * Callback invoked when all the tasks within the batch are completed. + */ + protected void onComplete() { + // No-op. + } + + /** + * Add task to the queue. + * + * @param task Task to add. + * @return {@code True} in case the task was added to the queue. + */ + private boolean addTask(GridGgfsFileWorkerTask task) { + finishLock.readLock().lock(); + + try { + if (!finishGuard.get()) { + try { + queue.put(task); + + return true; + } + catch (InterruptedException ignore) { + // Task was not enqueued due to interruption. + Thread.currentThread().interrupt(); + + return false; + } + } + else + return false; + + } + finally { + finishLock.readLock().unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorkerTask.java new file mode 100644 index 0000000..afef281 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileWorkerTask.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; + +/** + * Generic GGFS worker task which could potentially throw an exception. + */ +public interface GridGgfsFileWorkerTask { + /** + * Execute task logic. + * + * @throws IgniteCheckedException If failed. + */ + public void execute() throws IgniteCheckedException; +}
