http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsFileSystem.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsFileSystem.java new file mode 100644 index 0000000..967e26b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsFileSystem.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Common file system interface. It provides a typical generalized "view" of any file system: + * <ul> + * <li>list directories or get information for a single path</li> + * <li>create/move/delete files or directories</li> + * <li>write/read data streams into/from files</li> + * </ul> + * + * This is the minimum of functionality that is needed to work as secondary file system in dual modes of GGFS. + */ +public interface IgniteFsFileSystem { + /** File property: user name. */ + public static final String PROP_USER_NAME = "usrName"; + + /** File property: group name. */ + public static final String PROP_GROUP_NAME = "grpName"; + + /** File property: permission. */ + public static final String PROP_PERMISSION = "permission"; + + /** + * Checks if the specified path exists in the file system. + * + * @param path Path to check for existence in the file system. + * @return {@code True} if such file exists, otherwise - {@code false}. + * @throws IgniteException In case of error. + */ + public boolean exists(IgniteFsPath path); + + /** + * Updates file information for the specified path. Existent properties, not listed in the passed collection, + * will not be affected. Other properties will be added or overwritten. Passed properties with {@code null} values + * will be removed from the stored properties or ignored if they don't exist in the file info. + * <p> + * When working in {@code DUAL_SYNC} or {@code DUAL_ASYNC} modes only the following properties will be propagated + * to the secondary file system: + * <ul> + * <li>{@code usrName} - file owner name;</li> + * <li>{@code grpName} - file owner group;</li> + * <li>{@code permission} - Unix-style string representing file permissions.</li> + * </ul> + * + * @param path File path to set properties for. + * @param props Properties to update. + * @return File information for specified path or {@code null} if such path does not exist. + * @throws IgniteException In case of error. + */ + @Nullable public IgniteFsFile update(IgniteFsPath path, Map<String, String> props) throws IgniteException; + + /** + * Renames/moves a file. + * <p> + * You are free to rename/move data files as you wish, but directories can be only renamed. + * You cannot move the directory between different parent directories. + * <p> + * Examples: + * <ul> + * <li>"/work/file.txt" => "/home/project/Presentation Scenario.txt"</li> + * <li>"/work" => "/work-2012.bkp"</li> + * <li>"/work" => "<strike>/backups/work</strike>" - such operation is restricted for directories.</li> + * </ul> + * + * @param src Source file path to rename. + * @param dest Destination file path. If destination path is a directory, then source file will be placed + * into destination directory with original name. + * @throws IgniteException In case of error. + * @throws IgniteFsFileNotFoundException If source file doesn't exist. + */ + public void rename(IgniteFsPath src, IgniteFsPath dest) throws IgniteException; + + /** + * Deletes file. + * + * @param path File path to delete. + * @param recursive Delete non-empty directories recursively. + * @return {@code True} in case of success, {@code false} otherwise. + * @throws IgniteException In case of error. + */ + public boolean delete(IgniteFsPath path, boolean recursive) throws IgniteException; + + /** + * Creates directories under specified path. + * + * @param path Path of directories chain to create. + * @throws IgniteException In case of error. + */ + public void mkdirs(IgniteFsPath path) throws IgniteException; + + /** + * Creates directories under specified path with the specified properties. + * + * @param path Path of directories chain to create. + * @param props Metadata properties to set on created directories. + * @throws IgniteException In case of error. + */ + public void mkdirs(IgniteFsPath path, @Nullable Map<String, String> props) throws IgniteException; + + /** + * Lists file paths under the specified path. + * + * @param path Path to list files under. + * @return List of files under the specified path. + * @throws IgniteException In case of error. + * @throws IgniteFsFileNotFoundException If path doesn't exist. + */ + public Collection<IgniteFsPath> listPaths(IgniteFsPath path) throws IgniteException; + + /** + * Lists files under the specified path. + * + * @param path Path to list files under. + * @return List of files under the specified path. + * @throws IgniteException In case of error. + * @throws IgniteFsFileNotFoundException If path doesn't exist. + */ + public Collection<IgniteFsFile> listFiles(IgniteFsPath path) throws IgniteException; + + /** + * Opens a file for reading. + * + * @param path File path to read. + * @param bufSize Read buffer size (bytes) or {@code zero} to use default value. + * @return File input stream to read data from. + * @throws IgniteException In case of error. + * @throws IgniteFsFileNotFoundException If path doesn't exist. + */ + public IgniteFsReader open(IgniteFsPath path, int bufSize) throws IgniteException; + + /** + * Creates a file and opens it for writing. + * + * @param path File path to create. + * @param overwrite Overwrite file if it already exists. Note: you cannot overwrite an existent directory. + * @return File output stream to write data to. + * @throws IgniteException In case of error. + */ + public OutputStream create(IgniteFsPath path, boolean overwrite) throws IgniteException; + + /** + * Creates a file and opens it for writing. + * + * @param path File path to create. + * @param bufSize Write buffer size (bytes) or {@code zero} to use default value. + * @param overwrite Overwrite file if it already exists. Note: you cannot overwrite an existent directory. + * @param replication Replication factor. + * @param blockSize Block size. + * @param props File properties to set. + * @return File output stream to write data to. + * @throws IgniteException In case of error. + */ + public OutputStream create(IgniteFsPath path, int bufSize, boolean overwrite, int replication, long blockSize, + @Nullable Map<String, String> props) throws IgniteException; + + /** + * Opens an output stream to an existing file for appending data. + * + * @param path File path to append. + * @param bufSize Write buffer size (bytes) or {@code zero} to use default value. + * @param create Create file if it doesn't exist yet. + * @param props File properties to set only in case it file was just created. + * @return File output stream to append data to. + * @throws IgniteException In case of error. + * @throws IgniteFsFileNotFoundException If path doesn't exist and create flag is {@code false}. + */ + public OutputStream append(IgniteFsPath path, int bufSize, boolean create, @Nullable Map<String, String> props) + throws IgniteException; + + /** + * Gets file information for the specified path. + * + * @param path Path to get information for. + * @return File information for specified path or {@code null} if such path does not exist. + * @throws IgniteException In case of error. + */ + @Nullable public IgniteFsFile info(IgniteFsPath path) throws IgniteException; + + /** + * Gets used space in bytes. + * + * @return Used space in bytes. + * @throws IgniteException In case of error. + */ + public long usedSpaceSize() throws IgniteException; + + /** + * Gets the implementation specific properties of file system. + * + * @return Map of properties. + */ + public Map<String,String> properties(); +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsGroupDataBlocksKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsGroupDataBlocksKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsGroupDataBlocksKeyMapper.java new file mode 100644 index 0000000..f40b10e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsGroupDataBlocksKeyMapper.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * {@code GGFS} class providing ability to group file's data blocks together on one node. + * All blocks within the same group are guaranteed to be cached together on the same node. + * Group size parameter controls how many sequential blocks will be cached together on the same node. + * <p> + * For example, if block size is {@code 64kb} and group size is {@code 256}, then each group will contain + * {@code 64kb * 256 = 16Mb}. Larger group sizes would reduce number of splits required to run map-reduce + * tasks, but will increase inequality of data size being stored on different nodes. + * <p> + * Note that {@link #groupSize()} parameter must correlate to Hadoop split size parameter defined + * in Hadoop via {@code mapred.max.split.size} property. Ideally you want all blocks accessed + * within one split to be mapped to {@code 1} group, so they can be located on the same grid node. + * For example, default Hadoop split size is {@code 64mb} and default {@code GGFS} block size + * is {@code 64kb}. This means that to make sure that each split goes only through blocks on + * the same node (without hopping between nodes over network), we have to make the {@link #groupSize()} + * value be equal to {@code 64mb / 64kb = 1024}. + * <p> + * It is required for {@code GGFS} data cache to be configured with this mapper. Here is an + * example of how it can be specified in XML configuration: + * <pre name="code" class="xml"> + * <bean id="cacheCfgBase" class="org.gridgain.grid.cache.GridCacheConfiguration" abstract="true"> + * ... + * <property name="affinityMapper"> + * <bean class="org.apache.ignite.fs.IgniteFsGroupDataBlocksKeyMapper"> + * <!-- How many sequential blocks will be stored on the same node. --> + * <constructor-arg value="512"/> + * </bean> + * </property> + * ... + * </bean> + * </pre> + */ +public class IgniteFsGroupDataBlocksKeyMapper extends GridCacheDefaultAffinityKeyMapper { + /** */ + private static final long serialVersionUID = 0L; + + /** Size of the group. */ + private final int grpSize; + + /*** + * Constructs affinity mapper to group several data blocks with the same key. + * + * @param grpSize Size of the group in blocks. + */ + public IgniteFsGroupDataBlocksKeyMapper(int grpSize) { + A.ensure(grpSize >= 1, "grpSize >= 1"); + + this.grpSize = grpSize; + } + + /** {@inheritDoc} */ + @Override public Object affinityKey(Object key) { + if (key != null && GridGgfsBlockKey.class.equals(key.getClass())) { + GridGgfsBlockKey blockKey = (GridGgfsBlockKey)key; + + if (blockKey.affinityKey() != null) + return blockKey.affinityKey(); + + long grpId = blockKey.getBlockId() / grpSize; + + return blockKey.getFileId().hashCode() + (int)(grpId ^ (grpId >>> 32)); + } + + return super.affinityKey(key); + } + + /** + * @return Size of the group. + */ + public int groupSize() { + return grpSize; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsGroupDataBlocksKeyMapper.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInputStream.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInputStream.java new file mode 100644 index 0000000..510b948 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInputStream.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import java.io.*; + +/** + * {@code GGFS} input stream to read data from the file system. + * It provides several additional methods for asynchronous access. + */ +public abstract class IgniteFsInputStream extends InputStream implements IgniteFsReader { + /** + * Gets file length during file open. + * + * @return File length. + */ + public abstract long length(); + + /** + * Seek to the specified position. + * + * @param pos Position to seek to. + * @throws IOException In case of IO exception. + */ + public abstract void seek(long pos) throws IOException; + + /** + * Get the current position in the input stream. + * + * @return The current position in the input stream. + * @throws IOException In case of IO exception. + */ + public abstract long position() throws IOException; + + /** + * Read bytes from the given position in the stream to the given buffer. + * Continues to read until passed buffer becomes filled. + * + * @param pos Position in the input stream to seek. + * @param buf Buffer into which data is read. + * @throws IOException In case of IO exception. + */ + public abstract void readFully(long pos, byte[] buf) throws IOException; + + /** + * + * @param pos Position in the input stream to seek. + * @param buf Buffer into which data is read. + * @param off Offset in the buffer from which stream data should be written. + * @param len The number of bytes to read. + * @throws IOException In case of IO exception. + */ + public abstract void readFully(long pos, byte[] buf, int off, int len) throws IOException; + + /** + * + * @param pos Position in the input stream to seek. + * @param buf Buffer into which data is read. + * @param off Offset in the buffer from which stream data should be written. + * @param len The number of bytes to read. + * @return Total number of bytes read into the buffer, or -1 if there is no more data (EOF). + * @throws IOException In case of IO exception. + */ + @Override public abstract int read(long pos, byte[] buf, int off, int len) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidHdfsVersionException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidHdfsVersionException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidHdfsVersionException.java new file mode 100644 index 0000000..e3fe3e2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidHdfsVersionException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +/** + * Exception thrown when GridGain detects that remote HDFS version differs from version of HDFS libraries + * in GridGain classpath. + */ +public class IgniteFsInvalidHdfsVersionException extends IgniteFsException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param msg Error message. + */ + public IgniteFsInvalidHdfsVersionException(String msg) { + super(msg); + } + + /** + * @param msg Error message. + * @param cause Error cause. + */ + public IgniteFsInvalidHdfsVersionException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidPathException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidPathException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidPathException.java new file mode 100644 index 0000000..842c6f4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidPathException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.jetbrains.annotations.*; + +/** + * {@code GGFS} exception indicating that operation target is invalid + * (e.g. not a file while expecting to be a file). + */ +public class IgniteFsInvalidPathException extends IgniteFsException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates exception with given error message. + * + * @param msg Error message. + */ + public IgniteFsInvalidPathException(String msg) { + super(msg); + } + + /** + * Creates exception with given exception cause. + * + * @param cause Exception cause. + */ + public IgniteFsInvalidPathException(Throwable cause) { + super(cause); + } + + /** + * Creates exception with given error message and exception cause. + * + * @param msg Error message. + * @param cause Error cause. + */ + public IgniteFsInvalidPathException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMetrics.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMetrics.java new file mode 100644 index 0000000..b45260a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMetrics.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +/** + * {@code GGFS} metrics snapshot for the file system. Note, that some metrics are global and + * some are local (i.e. per each node). + */ +public interface IgniteFsMetrics { + /** + * Gets local used space in bytes. This is the sum of all file chunks stored on local node. + * <p> + * This is a local metric. + * + * @return Node used space in bytes. + */ + public long localSpaceSize(); + + /** + * Gets maximum amount of data that can be stored on local node. This metrics is either + * equal to {@link org.apache.ignite.configuration.IgniteFsConfiguration#getMaxSpaceSize()}, or, if it is {@code 0}, equal to + * {@code 80%} of maximum heap size allocated for JVM. + * + * @return Maximum GGFS local space size. + */ + public long maxSpaceSize(); + + /** + * Get used space in bytes used in the secondary file system. + * <p> + * This is a global metric. + * + * @return Used space in the secondary file system or {@code 0} in case no secondary file system is configured. + */ + public long secondarySpaceSize(); + + /** + * Gets number of directories created in file system. + * <p> + * This is a global metric. + * + * @return Number of directories. + */ + public int directoriesCount(); + + /** + * Gets number of files stored in file system. + * <p> + * This is a global metric. + * + * @return Number of files. + */ + public int filesCount(); + + /** + * Gets number of files that are currently opened for reading. + * <p> + * This is a local metric. + * + * @return Number of opened files. + */ + public int filesOpenedForRead(); + + /** + * Gets number of files that are currently opened for writing. + * <p> + * This is a local metric. + * + * @return Number of opened files. + */ + public int filesOpenedForWrite(); + + /** + * Gets total blocks read, local and remote. + * <p> + * This is a local metric. + * + * @return Total blocks read. + */ + public long blocksReadTotal(); + + /** + * Gets total remote blocks read. + * <p> + * This is a local metric. + * + * @return Total blocks remote read. + */ + public long blocksReadRemote(); + + /** + * Gets total blocks written, local and remote. + * <p> + * This is a local metric. + * + * @return Total blocks written. + */ + public long blocksWrittenTotal(); + + /** + * Gets total remote blocks written. + * <p> + * This is a local metric. + * + * @return Total blocks written. + */ + public long blocksWrittenRemote(); + + /** + * Gets total bytes read. + * <p> + * This is a local metric. + * + * @return Total bytes read. + */ + public long bytesRead(); + + /** + * Gets total bytes read time. + * <p> + * This is a local metric. + * + * @return Total bytes read time. + */ + public long bytesReadTime(); + + /** + * Gets total bytes written. + * <p> + * This is a local metric. + * + * @return Total bytes written. + */ + public long bytesWritten(); + + /** + * Gets total bytes write time. + * <p> + * This is a local metric. + * + * @return Total bytes write time. + */ + public long bytesWriteTime(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMode.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMode.java new file mode 100644 index 0000000..d847387 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMode.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.jetbrains.annotations.*; + +/** + * {@code GGFS} mode defining interactions with underlying secondary Hadoop file system. + * Secondary Hadoop file system is provided for pass-through, write-through, and + * read-through purposes. + * <p> + * This mode is configured via {@link org.apache.ignite.configuration.IgniteFsConfiguration#getDefaultMode()} + * configuration property. + */ +public enum IgniteFsMode { + /** + * In this mode GGFS will not delegate to secondary Hadoop file system and will + * cache all the files in memory only. + */ + PRIMARY, + + /** + * In this mode GGFS will not cache any files in memory and will only pass them + * through to secondary Hadoop file system. If this mode is enabled, then + * secondary Hadoop file system must be configured. + * + * @see org.apache.ignite.configuration.IgniteFsConfiguration#getSecondaryHadoopFileSystemUri() + */ + PROXY, + + /** + * In this mode {@code GGFS} will cache files locally and also <i>synchronously</i> + * write them through to secondary Hadoop file system. + * <p> + * If secondary Hadoop file system is not configured, then this mode behaves like + * {@link #PRIMARY} mode. + * + * @see org.apache.ignite.configuration.IgniteFsConfiguration#getSecondaryHadoopFileSystemUri() + */ + DUAL_SYNC, + + /** + * In this mode {@code GGFS} will cache files locally and also <i>asynchronously</i> + * write them through to secondary Hadoop file system. + * <p> + * If secondary Hadoop file system is not configured, then this mode behaves like + * {@link #PRIMARY} mode. + * + * @see org.apache.ignite.configuration.IgniteFsConfiguration#getSecondaryHadoopFileSystemUri() + */ + DUAL_ASYNC; + + /** Enumerated values. */ + private static final IgniteFsMode[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value or {@code null} if ordinal out of range. + */ + @Nullable public static IgniteFsMode fromOrdinal(int ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutOfSpaceException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutOfSpaceException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutOfSpaceException.java new file mode 100644 index 0000000..6fc354e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutOfSpaceException.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.jetbrains.annotations.*; + +/** + * {@code GGFS} exception that is thrown when it detected out-of-space condition. + * It is thrown when number of writes written to a {@code GGFS} data nodes exceeds + * its maximum value (that is configured per-node). + */ +public class IgniteFsOutOfSpaceException extends IgniteFsException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates exception with given error message. + * + * @param msg Error message. + */ + public IgniteFsOutOfSpaceException(String msg) { + super(msg); + } + + /** + * Creates an instance of exception with given exception cause. + * + * @param cause Exception cause. + */ + public IgniteFsOutOfSpaceException(Throwable cause) { + super(cause); + } + + /** + * Creates an instance of GGFS exception with given error message and given exception cause. + * + * @param msg Error message. + * @param cause Exception cause. + */ + public IgniteFsOutOfSpaceException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutputStream.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutputStream.java new file mode 100644 index 0000000..9cb8181 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutputStream.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import java.io.*; + +/** + * {@code GGFS} output stream to write data into the file system. + */ +public abstract class IgniteFsOutputStream extends OutputStream { + /** + * Transfers specified amount of bytes from data input to this output stream. + * This method is optimized to avoid unnecessary temporal buffer creation and byte array copy. + * + * @param in Data input to copy bytes from. + * @param len Data length to copy. + * @throws IOException If write failed, read from input failed or there is no enough data in data input. + */ + public abstract void transferFrom(DataInput in, int len) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsParentNotDirectoryException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsParentNotDirectoryException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsParentNotDirectoryException.java new file mode 100644 index 0000000..04a1383 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsParentNotDirectoryException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.jetbrains.annotations.*; + +/** + * Exception thrown when parent supposed to be a directory is a file. + */ +public class IgniteFsParentNotDirectoryException extends IgniteFsInvalidPathException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param msg Error message. + */ + public IgniteFsParentNotDirectoryException(String msg) { + super(msg); + } + + /** + * @param cause Exception cause. + */ + public IgniteFsParentNotDirectoryException(Throwable cause) { + super(cause); + } + + /** + * @param msg Error message. + * @param cause Exception cause. + */ + public IgniteFsParentNotDirectoryException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPath.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPath.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPath.java new file mode 100644 index 0000000..f6e1d14 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPath.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * {@code GGFS} path to file in the file system. For example, to get information about + * a file you would use the following code: + * <pre name="code" class="java"> + * GridGgfsPath dirPath = new GridGgfsPath("/my/working/dir"); + * GridGgfsPath filePath = new GridGgfsPath(dirPath, "file.txt"); + * + * // Get metadata about file. + * GridGgfsFile file = ggfs.info(filePath); + * </pre> + */ +public final class IgniteFsPath implements Comparable<IgniteFsPath>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** The directory separator character. */ + private static final char SLASH_CHAR = '/'; + + /** The directory separator. */ + private static final String SLASH = "/"; + + /** URI representing this path. Should never change after object creation or de-serialization. */ + private String path; + + /** + * Constructs default root path. + */ + public IgniteFsPath() { + path = SLASH; + } + + /** + * Constructs a path from an URI + * + * @param uri URI to create path from. + */ + public IgniteFsPath(URI uri) { + A.notNull(uri, "uri"); + + path = normalizePath(uri.getPath()); + } + + /** + * Constructs a path from the URI string. + * + * @param path URI string. + */ + public IgniteFsPath(String path) { + A.ensure(!F.isEmpty(path), "'path' is null or empty"); + + this.path = normalizePath(path); + } + + /** + * Resolve a child path against a parent path. + * + * @param parentPath Parent path. + * @param childPath Child path. + */ + public IgniteFsPath(IgniteFsPath parentPath, String childPath) { + A.notNull(parentPath, "parentPath"); + + String path = GridFilenameUtils.concat(parentPath.path, childPath); + + if (F.isEmpty(path)) + throw new IllegalArgumentException("Failed to parse path" + + " [parent=" + parentPath + ", childPath=" + childPath + ']'); + + this.path = normalizePath(path); + } + + /** + * Initialize path with (1) not-null, (2) normalized, (3) absolute and (4) unix-format path component. + * + * @param path Path. + * @return Normalized path. + */ + private static String normalizePath(String path) { + assert path != null; + + String normalizedPath = GridFilenameUtils.normalizeNoEndSeparator(path, true); + + if (F.isEmpty(normalizedPath)) + throw new IllegalArgumentException("Failed to normalize path: " + path); + + if (!SLASH.equals(GridFilenameUtils.getPrefix(normalizedPath))) + throw new IllegalArgumentException("Path should be absolute: " + path); + + assert !normalizedPath.isEmpty() : "Expects normalized path is not empty."; + assert normalizedPath.length() == 1 || !normalizedPath.endsWith(SLASH) : + "Expects normalized path is root or don't ends with '/' symbol."; + + return normalizedPath; + } + + /** + * Returns the final component of this path. + * + * @return The final component of this path. + */ + public String name() { + return GridFilenameUtils.getName(path); + } + + /** + * Returns a root for this path. + * + * @return Root for this path. + */ + public IgniteFsPath root() { + return new IgniteFsPath(); + } + + /** + * Split full path on components. + * + * @return Path components. + */ + public List<String> components() { + String path = this.path; + + assert path.length() >= 1 : "Path expected to be absolute: " + path; + + // Path is short-living object, so we don't need to cache component's resolution result. + return path.length() == 1 ? Collections.<String>emptyList() : Arrays.asList(path.substring(1).split(SLASH)); + } + + /** + * Returns the parent of a path or {@code null} if at root. + * + * @return The parent of a path or {@code null} if at root. + */ + @Nullable public IgniteFsPath parent() { + String path = this.path; + + if (path.length() == 1) + return null; // Current path is root. + + path = GridFilenameUtils.getFullPathNoEndSeparator(path); + + return new IgniteFsPath(path); + } + + /** + * Adds a suffix to the final name in the path. + * + * @param suffix Suffix. + * @return Path with suffix. + */ + public IgniteFsPath suffix(String suffix) { + A.ensure(!F.isEmpty(suffix), "'suffix' is null or empty."); + A.ensure(!suffix.contains(SLASH), "'suffix' contains file's separator '" + SLASH + "'"); + + return new IgniteFsPath(path + suffix); + } + + /** + * Return the number of elements in this path. + * + * @return The number of elements in this path, zero depth means root directory. + */ + public int depth() { + final String path = this.path; + final int size = path.length(); + + assert size >= 1 && path.charAt(0) == SLASH_CHAR : "Expects absolute path: " + path; + + if (size == 1) + return 0; + + int depth = 1; + + // Ignore the first character. + for (int i = 1; i < size; i++) + if (path.charAt(i) == SLASH_CHAR) + depth++; + + return depth; + } + + /** + * Checks whether this path is a sub-directory of argument. + * + * @param path Path to check. + * @return {@code True} if argument is same or a sub-directory of this object. + */ + public boolean isSubDirectoryOf(IgniteFsPath path) { + A.notNull(path, "path"); + + return this.path.startsWith(path.path.endsWith(SLASH) ? path.path : path.path + SLASH); + } + + /** + * Checks if paths are identical. + * + * @param path Path to check. + * @return {@code True} if paths are identical. + */ + public boolean isSame(IgniteFsPath path) { + A.notNull(path, "path"); + + return this == path || this.path.equals(path.path); + } + + /** {@inheritDoc} */ + @Override public int compareTo(IgniteFsPath o) { + return path.compareTo(o.path); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, path); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + path = U.readString(in); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return path.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return o == this || o != null && getClass() == o.getClass() && path.equals(((IgniteFsPath)o).path); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return path; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathAlreadyExistsException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathAlreadyExistsException.java new file mode 100644 index 0000000..17fa71a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathAlreadyExistsException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.jetbrains.annotations.*; + +/** + * Exception thrown when target path supposed to be created already exists. + */ +public class IgniteFsPathAlreadyExistsException extends IgniteFsInvalidPathException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param msg Error message. + */ + public IgniteFsPathAlreadyExistsException(String msg) { + super(msg); + } + + /** + * @param cause Exception cause. + */ + public IgniteFsPathAlreadyExistsException(Throwable cause) { + super(cause); + } + + /** + * @param msg Error message. + * @param cause Exception cause. + */ + public IgniteFsPathAlreadyExistsException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathSummary.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathSummary.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathSummary.java new file mode 100644 index 0000000..a03e704 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathSummary.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Path summary: total files count, total directories count, total length. + */ +public class IgniteFsPathSummary implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Path. */ + private IgniteFsPath path; + + /** File count. */ + private int filesCnt; + + /** Directories count. */ + private int dirCnt; + + /** Length consumed. */ + private long totalLen; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public IgniteFsPathSummary() { + // No-op. + } + + /** + * Construct empty path summary. + * + * @param path Path. + */ + public IgniteFsPathSummary(IgniteFsPath path) { + this.path = path; + } + + /** + * @return Files count. + */ + public int filesCount() { + return filesCnt; + } + + /** + * @param filesCnt Files count. + */ + public void filesCount(int filesCnt) { + this.filesCnt = filesCnt; + } + + /** + * @return Directories count. + */ + public int directoriesCount() { + return dirCnt; + } + + /** + * @param dirCnt Directories count. + */ + public void directoriesCount(int dirCnt) { + this.dirCnt = dirCnt; + } + + /** + * @return Total length. + */ + public long totalLength() { + return totalLen; + } + + /** + * @param totalLen Total length. + */ + public void totalLength(long totalLen) { + this.totalLen = totalLen; + } + + /** + * @return Path for which summary is obtained. + */ + public IgniteFsPath path() { + return path; + } + + /** + * @param path Path for which summary is obtained. + */ + public void path(IgniteFsPath path) { + this.path = path; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(filesCnt); + out.writeInt(dirCnt); + out.writeLong(totalLen); + + path.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + filesCnt = in.readInt(); + dirCnt = in.readInt(); + totalLen = in.readLong(); + + path = new IgniteFsPath(); + path.readExternal(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsPathSummary.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsReader.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsReader.java new file mode 100644 index 0000000..6e16792 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsReader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import java.io.*; + +/** + * The simplest data input interface to read from secondary file system in dual modes. + */ +public interface IgniteFsReader extends Closeable { + /** + * Read up to the specified number of bytes, from a given position within a file, and return the number of bytes + * read. + * + * @param pos Position in the input stream to seek. + * @param buf Buffer into which data is read. + * @param off Offset in the buffer from which stream data should be written. + * @param len The number of bytes to read. + * @return Total number of bytes read into the buffer, or -1 if there is no more data (EOF). + * @throws IOException In case of any exception. + */ + public int read(long pos, byte[] buf, int off, int len) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsFileRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsFileRange.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsFileRange.java new file mode 100644 index 0000000..21e2710 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsFileRange.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs.mapreduce; + +import org.apache.ignite.ignitefs.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Entity representing part of GGFS file identified by file path, start position, and length. + */ +public class IgniteFsFileRange { + /** File path. */ + private IgniteFsPath path; + + /** Start position. */ + private long start; + + /** Length. */ + private long len; + + /** + * Creates file range. + * + * @param path File path. + * @param start Start position. + * @param len Length. + */ + public IgniteFsFileRange(IgniteFsPath path, long start, long len) { + this.path = path; + this.start = start; + this.len = len; + } + + /** + * Gets file path. + * + * @return File path. + */ + public IgniteFsPath path() { + return path; + } + + /** + * Gets range start position. + * + * @return Start position. + */ + public long start() { + return start; + } + + /** + * Gets range length. + * + * @return Length. + */ + public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsFileRange.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsInputStreamJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsInputStreamJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsInputStreamJobAdapter.java new file mode 100644 index 0000000..75cf6d2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsInputStreamJobAdapter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.ignitefs.*; +import org.apache.ignite.internal.util.*; + +import java.io.*; + +/** + * Convenient {@link IgniteFsJob} adapter. It limits data returned from {@link org.apache.ignite.ignitefs.IgniteFsInputStream} to bytes within + * the {@link IgniteFsFileRange} assigned to the job. + * <p> + * Under the covers it simply puts job's {@code GridGgfsInputStream} position to range start and wraps in into + * {@link GridFixedSizeInputStream} limited to range length. + */ +public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter { + /** {@inheritDoc} */ + @Override public final Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) + throws IgniteException, IOException { + in.seek(range.start()); + + return execute(ggfs, new IgniteFsRangeInputStream(in, range)); + } + + /** + * Executes this job. + * + * @param ggfs GGFS instance. + * @param in Input stream. + * @return Execution result. + * @throws IgniteException If execution failed. + * @throws IOException If IO exception encountered while working with stream. + */ + public abstract Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws IgniteException, IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJob.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJob.java new file mode 100644 index 0000000..e1b481d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJob.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.ignitefs.*; + +import java.io.*; + +/** + * Defines executable unit for {@link IgniteFsTask}. Before this job is executed, it is assigned one of the + * ranges provided by the {@link IgniteFsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods. + * <p> + * {@link #execute(org.apache.ignite.IgniteFs, IgniteFsFileRange, org.apache.ignite.ignitefs.IgniteFsInputStream)} method is given {@link IgniteFsFileRange} this + * job is expected to operate on, and already opened {@link org.apache.ignite.ignitefs.IgniteFsInputStream} for the file this range belongs to. + * <p> + * Note that provided input stream has position already adjusted to range start. However, it will not + * automatically stop on range end. This is done to provide capability in some cases to look beyond + * the range end or seek position before the reange start. + * <p> + * In majority of the cases, when you want to process only provided range, you should explicitly control amount + * of returned data and stop at range end. You can also use {@link IgniteFsInputStreamJobAdapter}, which operates + * on {@link IgniteFsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with + * {@link IgniteFsRangeInputStream}. + * <p> + * You can inject any resources in concrete implementation, just as with regular {@link org.apache.ignite.compute.ComputeJob} implementations. + */ +public interface IgniteFsJob { + /** + * Executes this job. + * + * @param ggfs GGFS instance. + * @param range File range aligned to record boundaries. + * @param in Input stream for split file. This input stream is not aligned to range and points to file start + * by default. + * @return Execution result. + * @throws IgniteException If execution failed. + * @throws IOException If file system operation resulted in IO exception. + */ + public Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) throws IgniteException, + IOException; + + /** + * This method is called when system detects that completion of this + * job can no longer alter the overall outcome (for example, when parent task + * has already reduced the results). Job is also cancelled when + * {@link org.apache.ignite.compute.ComputeTaskFuture#cancel()} is called. + * <p> + * Note that job cancellation is only a hint, and just like with + * {@link Thread#interrupt()} method, it is really up to the actual job + * instance to gracefully finish execution and exit. + */ + public void cancel(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJobAdapter.java new file mode 100644 index 0000000..ba907f2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJobAdapter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs.mapreduce; + +/** + * Adapter for {@link IgniteFsJob} with no-op implementation of {@link #cancel()} method. + */ +public abstract class IgniteFsJobAdapter implements IgniteFsJob { + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRangeInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRangeInputStream.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRangeInputStream.java new file mode 100644 index 0000000..b6df002 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRangeInputStream.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs.mapreduce; + +import org.apache.ignite.ignitefs.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Decorator for regular {@link org.apache.ignite.ignitefs.IgniteFsInputStream} which streams only data within the given range. + * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create + * jobs which will be working only with the assigned range. You can also use it explicitly when + * working with {@link IgniteFsJob} directly. + */ +public final class IgniteFsRangeInputStream extends IgniteFsInputStream { + /** Base input stream. */ + private final IgniteFsInputStream is; + + /** Start position. */ + private final long start; + + /** Maximum stream length. */ + private final long maxLen; + + /** Current position within the stream. */ + private long pos; + + /** + * Constructor. + * + * @param is Base input stream. + * @param start Start position. + * @param maxLen Maximum stream length. + * @throws IOException In case of exception. + */ + public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException { + if (is == null) + throw new IllegalArgumentException("Input stream cannot be null."); + + if (start < 0) + throw new IllegalArgumentException("Start position cannot be negative."); + + if (start >= is.length()) + throw new IllegalArgumentException("Start position cannot be greater that file length."); + + if (maxLen < 0) + throw new IllegalArgumentException("Length cannot be negative."); + + if (start + maxLen > is.length()) + throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length."); + + this.is = is; + this.start = start; + this.maxLen = maxLen; + + is.seek(start); + } + + /** {@inheritDoc} */ + @Override public long length() { + return is.length(); + } + + /** + * Constructor. + * + * @param is Base input stream. + * @param range File range. + * @throws IOException In case of exception. + */ + public IgniteFsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException { + this(is, range.start(), range.length()); + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + if (pos < maxLen) { + int res = is.read(); + + if (res != -1) + pos++; + + return res; + } + else + return -1; + } + + /** {@inheritDoc} */ + @Override public int read(@NotNull byte[] b, int off, int len) throws IOException { + if (pos < maxLen) { + len = (int)Math.min(len, maxLen - pos); + + int res = is.read(b, off, len); + + if (res != -1) + pos += res; + + return res; + } + else + return -1; + } + + /** {@inheritDoc} */ + @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { + seek(pos); + + return read(buf, off, len); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf) throws IOException { + readFully(pos, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { + seek(pos); + + for (int readBytes = 0; readBytes < len;) { + int read = read(buf, off + readBytes, len - readBytes); + + if (read == -1) + throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos + + ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']'); + + readBytes += read; + } + } + + /** {@inheritDoc} */ + @Override public void seek(long pos) throws IOException { + if (pos < 0) + throw new IOException("Seek position cannot be negative: " + pos); + + is.seek(start + pos); + + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public long position() { + return pos; + } + + /** + * Since range input stream represents a part of larger file stream, there is an offset at which this + * range input stream starts in original input stream. This method returns start offset of this input + * stream relative to original input stream. + * + * @return Start offset in original input stream. + */ + public long startOffset() { + return start; + } + + /** {@inheritDoc} */ + @Override public int available() { + long l = maxLen - pos; + + if (l < 0) + return 0; + + if (l > Integer.MAX_VALUE) + return Integer.MAX_VALUE; + + return (int)l; + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + is.close(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsRangeInputStream.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRecordResolver.java new file mode 100644 index 0000000..3efd411 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRecordResolver.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.ignitefs.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * GGFS record resolver. When {@link IgniteFsTask} is split into {@link IgniteFsJob}s each produced job will obtain + * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual + * execution in order to adjust record boundaries in a way consistent with user data. + * <p> + * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file. + * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver + * you can adjust job range so that it covers the whole line(s). + * <p> + * The following record resolvers are available out of the box: + * <ul> + * <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsFixedLengthRecordResolver}</li> + * <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsByteDelimiterRecordResolver}</li> + * <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsStringDelimiterRecordResolver}</li> + * <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsNewLineRecordResolver}</li> + * </ul> + */ +public interface IgniteFsRecordResolver extends Serializable { + /** + * Adjusts record start offset and length. + * + * @param fs IgniteFs instance to use. + * @param stream Input stream for split file. + * @param suggestedRecord Suggested file system record. + * @return New adjusted record. If this method returns {@code null}, original record is ignored. + * @throws IgniteException If resolve failed. + * @throws IOException If resolve failed. + */ + @Nullable public IgniteFsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream, + IgniteFsFileRange suggestedRecord) throws IgniteException, IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTask.java new file mode 100644 index 0000000..88463e6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTask.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.ignitefs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task + * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing + * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement + * {@link IgniteFsTask#createJob(org.apache.ignite.ignitefs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method. + * <p> + * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of + * consequent bytes located on a single node (see {@code IgniteFsGroupDataBlocksKeyMapper}). In case maximum range size + * is provided (either through {@link org.apache.ignite.configuration.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()} + * argument), then ranges could be further divided into smaller chunks. + * <p> + * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a + * {@link IgniteFsJob}. + * <p> + * Finally all generated jobs are sent to Grid nodes for execution. + * <p> + * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step. + * <p> + * Here is an example of such a task: + * <pre name="code" class="java"> + * public class WordCountTask extends GridGgfsTask<String, Integer> { + * @Override + * public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs<T> args) throws IgniteCheckedException { + * // New job will be created for each range within each file. + * // We pass user-provided argument (which is essentially a word to look for) to that job. + * return new WordCountJob(args.userArgument()); + * } + * + * // Aggregate results into one compound result. + * public Integer reduce(List<GridComputeJobResult> results) throws IgniteCheckedException { + * Integer total = 0; + * + * for (GridComputeJobResult res : results) { + * Integer cnt = res.getData(); + * + * // Null can be returned for non-existent file in case we decide to ignore such situations. + * if (cnt != null) + * total += cnt; + * } + * + * return total; + * } + * } + * </pre> + */ +public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTaskArgs<T>, R> { + /** */ + private static final long serialVersionUID = 0L; + + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable IgniteFsTaskArgs<T> args) { + assert ignite != null; + assert args != null; + + IgniteFs fs = ignite.fileSystem(args.ggfsName()); + IgniteFsProcessorAdapter ggfsProc = ((IgniteKernal) ignite).context().ggfs(); + + Map<ComputeJob, ClusterNode> splitMap = new HashMap<>(); + + Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid); + + for (IgniteFsPath path : args.paths()) { + IgniteFsFile file = fs.info(path); + + if (file == null) { + if (args.skipNonExistentFiles()) + continue; + else + throw new IgniteException("Failed to process IgniteFs file because it doesn't exist: " + path); + } + + Collection<IgniteFsBlockLocation> aff = fs.affinity(path, 0, file.length(), args.maxRangeLength()); + + long totalLen = 0; + + for (IgniteFsBlockLocation loc : aff) { + ClusterNode node = null; + + for (UUID nodeId : loc.nodeIds()) { + node = nodes.get(nodeId); + + if (node != null) + break; + } + + if (node == null) + throw new IgniteException("Failed to find any of block affinity nodes in subgrid [loc=" + loc + + ", subgrid=" + subgrid + ']'); + + IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args); + + if (job != null) { + ComputeJob jobImpl = ggfsProc.createJob(job, fs.name(), file.path(), loc.start(), + loc.length(), args.recordResolver()); + + splitMap.put(jobImpl, node); + } + + totalLen += loc.length(); + } + + assert totalLen == file.length(); + } + + return splitMap; + } + + /** + * Callback invoked during task map procedure to create job that will process specified split + * for GGFS file. + * + * @param path Path. + * @param range File range based on consecutive blocks. This range will be further + * realigned to record boundaries on destination node. + * @param args Task argument. + * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped. + * @throws IgniteException If job creation failed. + */ + @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, + IgniteFsTaskArgs<T> args) throws IgniteException; + + /** + * Maps list by node ID. + * + * @param subgrid Subgrid. + * @return Map. + */ + private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) { + Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size()); + + for (ClusterNode node : subgrid) + res.put(node.id(), node); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTaskArgs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTaskArgs.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTaskArgs.java new file mode 100644 index 0000000..4be5001 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTaskArgs.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs.mapreduce; + +import org.apache.ignite.ignitefs.*; + +import java.util.*; + +/** + * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods, + * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is + * passed to {@link IgniteFsTask#createJob(org.apache.ignite.ignitefs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method. + * <p> + * Task arguments encapsulates the following data: + * <ul> + * <li>GGFS name</li> + * <li>File paths passed to {@code GridGgfs.execute()} method</li> + * <li>{@link IgniteFsRecordResolver} for that task</li> + * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li> + * <li>User-defined task argument</li> + * <li>Maximum file range length for that task (see {@link org.apache.ignite.configuration.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li> + * </ul> + */ +public interface IgniteFsTaskArgs<T> { + /** + * Gets GGFS name. + * + * @return GGFS name. + */ + public String ggfsName(); + + /** + * Gets file paths to process. + * + * @return File paths to process. + */ + public Collection<IgniteFsPath> paths(); + + /** + * Gets record resolver for the task. + * + * @return Record resolver. + */ + public IgniteFsRecordResolver recordResolver(); + + /** + * Flag indicating whether to fail or simply skip non-existent files. + * + * @return {@code True} if non-existent files should be skipped. + */ + public boolean skipNonExistentFiles(); + + /** + * User argument provided for task execution. + * + * @return User argument. + */ + public T userArgument(); + + /** + * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including + * all consecutive blocks will be used without any limitations. + * + * @return Maximum range length. + */ + public long maxRangeLength(); +}