http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java new file mode 100644 index 0000000..351839a --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Counter for the job statistics accumulation. + */ +public class HadoopPerformanceCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The group name for this counter. */ + private static final String GROUP_NAME = "SYSTEM"; + + /** The counter name for this counter. */ + private static final String COUNTER_NAME = "PERFORMANCE"; + + /** Events collections. */ + private Collection<T2<String,Long>> evts = new ArrayList<>(); + + /** Node id to insert into the event info. */ + private UUID nodeId; + + /** */ + private int reducerNum; + + /** */ + private volatile Long firstShuffleMsg; + + /** */ + private volatile Long lastShuffleMsg; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopPerformanceCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopPerformanceCounter(String grp, String name) { + super(grp, name); + } + + /** + * Constructor to create instance to use this as helper. + * + * @param nodeId Id of the work node. + */ + public HadoopPerformanceCounter(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + U.writeCollection(out, evts); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + try { + evts = U.readCollection(in); + } + catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounter cntr) { + evts.addAll(((HadoopPerformanceCounter)cntr).evts); + } + + /** + * Gets the events collection. + * + * @return Collection of event. + */ + public Collection<T2<String, Long>> evts() { + return evts; + } + + /** + * Generate name that consists of some event information. + * + * @param info Task info. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(HadoopTaskInfo info, String evtType) { + return eventName(info.type().toString(), info.taskNumber(), evtType); + } + + /** + * Generate name that consists of some event information. + * + * @param taskType Task type. + * @param taskNum Number of the task. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(String taskType, int taskNum, String evtType) { + assert nodeId != null; + + return taskType + " " + taskNum + " " + evtType + " " + nodeId; + } + + /** + * Adds event of the task submission (task instance creation). + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskSubmit(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "submit"), ts)); + } + + /** + * Adds event of the task preparation. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskPrepare(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "prepare"), ts)); + } + + /** + * Adds event of the task finish. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskFinish(HadoopTaskInfo info, long ts) { + if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) { + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); + + lastShuffleMsg = null; + } + + evts.add(new T2<>(eventName(info, "finish"), ts)); + } + + /** + * Adds event of the task run. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskStart(HadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "start"), ts)); + } + + /** + * Adds event of the job preparation. + * + * @param ts Timestamp of the event. + */ + public void onJobPrepare(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB prepare " + nodeId, ts)); + } + + /** + * Adds event of the job start. + * + * @param ts Timestamp of the event. + */ + public void onJobStart(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB start " + nodeId, ts)); + } + + /** + * Adds client submission events from job info. + * + * @param info Job info. + */ + public void clientSubmissionEvents(HadoopJobInfo info) { + assert nodeId != null; + + addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY); + } + + /** + * Adds event with timestamp from some property in job info. + * + * @param evt Event type and phase. + * @param info Job info. + * @param propName Property name to get timestamp. + */ + private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) { + String val = info.property(propName); + + if (!F.isEmpty(val)) { + try { + evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val))); + } + catch (NumberFormatException e) { + throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e); + } + } + } + + /** + * Registers shuffle message event. + * + * @param reducerNum Number of reducer that receives the data. + * @param ts Timestamp of the event. + */ + public void onShuffleMessage(int reducerNum, long ts) { + this.reducerNum = reducerNum; + + if (firstShuffleMsg == null) + firstShuffleMsg = ts; + + lastShuffleMsg = ts; + } + + /** + * Gets system predefined performance counter from the HadoopCounters object. + * + * @param cntrs HadoopCounters object. + * @param nodeId Node id for methods that adds events. It may be null if you don't use ones. + * @return Predefined performance counter. + */ + public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) { + HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + + if (nodeId != null) + cntr.nodeId(nodeId); + + return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + } + + /** + * Sets the nodeId field. + * + * @param nodeId Node id. + */ + private void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java deleted file mode 100644 index e9461e2..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.IgfsConfiguration.*; - -/** - * Wrapper of HDFS for support of separated working directory. - */ -public class GridHadoopDistributedFileSystem extends DistributedFileSystem { - /** User name for each thread. */ - private final ThreadLocal<String> userName = new ThreadLocal<String>() { - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() { - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; - - /** {@inheritDoc} */ - @Override public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - - setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); - } - - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - public void setUser(String userName) { - this.userName.set(userName); - - setWorkingDirectory(getHomeDirectory()); - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); - - return path.makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path dir) { - Path fixedDir = fixRelativePart(dir); - - String res = fixedDir.toUri().getPath(); - - if (!DFSUtil.isValidName(res)) - throw new IllegalArgumentException("Invalid DFS directory name " + res); - - workingDir.set(fixedDir); - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workingDir.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java deleted file mode 100644 index 52e7d29..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.ignite.igfs.hadoop.v1.*; - -/** - * Utilities for configuring file systems to support the separate working directory per each thread. - */ -public class GridHadoopFileSystemsUtils { - /** Name of the property for setting working directory on create new local FS instance. */ - public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; - - /** - * Set user name and default working directory for current thread if it's supported by file system. - * - * @param fs File system. - * @param userName User name. - */ - public static void setUser(FileSystem fs, String userName) { - if (fs instanceof IgfsHadoopFileSystem) - ((IgfsHadoopFileSystem)fs).setUser(userName); - else if (fs instanceof GridHadoopDistributedFileSystem) - ((GridHadoopDistributedFileSystem)fs).setUser(userName); - } - - /** - * Setup wrappers of filesystems to support the separate working directory. - * - * @param cfg Config for setup. - */ - public static void setupFileSystems(Configuration cfg) { - cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", GridHadoopLocalFileSystemV1.class.getName()); - cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", - GridHadoopLocalFileSystemV2.class.getName()); - - cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", GridHadoopDistributedFileSystem.class.getName()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java deleted file mode 100644 index 28834d4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.fs.*; - -import java.io.*; - -/** - * Local file system replacement for Hadoop jobs. - */ -public class GridHadoopLocalFileSystemV1 extends LocalFileSystem { - /** - * Creates new local file system. - */ - public GridHadoopLocalFileSystemV1() { - super(new GridHadoopRawLocalFileSystem()); - } - - /** {@inheritDoc} */ - @Override public File pathToFile(Path path) { - return ((GridHadoopRawLocalFileSystem)getRaw()).convert(path); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java deleted file mode 100644 index 62d7cea..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.local.*; - -import java.io.*; -import java.net.*; - -import static org.apache.hadoop.fs.FsConstants.*; - -/** - * Local file system replacement for Hadoop jobs. - */ -public class GridHadoopLocalFileSystemV2 extends ChecksumFs { - /** - * Creates new local file system. - * - * @param cfg Configuration. - * @throws IOException If failed. - * @throws URISyntaxException If failed. - */ - public GridHadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException { - super(new DelegateFS(cfg)); - } - - /** - * Creates new local file system. - * - * @param uri URI. - * @param cfg Configuration. - * @throws IOException If failed. - * @throws URISyntaxException If failed. - */ - public GridHadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException { - this(cfg); - } - - /** - * Delegate file system. - */ - private static class DelegateFS extends DelegateToFileSystem { - /** - * Creates new local file system. - * - * @param cfg Configuration. - * @throws IOException If failed. - * @throws URISyntaxException If failed. - */ - public DelegateFS(Configuration cfg) throws IOException, URISyntaxException { - super(LOCAL_FS_URI, new GridHadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false); - } - - /** {@inheritDoc} */ - @Override public int getUriDefaultPort() { - return -1; - } - - /** {@inheritDoc} */ - @Override public FsServerDefaults getServerDefaults() throws IOException { - return LocalConfigKeys.getServerDefaults(); - } - - /** {@inheritDoc} */ - @Override public boolean isValidName(String src) { - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java deleted file mode 100644 index 29645f8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.net.*; -import java.nio.file.*; - -/** - * Local file system implementation for Hadoop. - */ -public class GridHadoopRawLocalFileSystem extends FileSystem { - /** Working directory for each thread. */ - private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() { - @Override protected Path initialValue() { - return getInitialWorkingDirectory(); - } - }; - - /** - * Converts Hadoop path to local path. - * - * @param path Hadoop path. - * @return Local path. - */ - File convert(Path path) { - checkPath(path); - - if (path.isAbsolute()) - return new File(path.toUri().getPath()); - - return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath()); - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - return makeQualified(new Path(System.getProperty("user.home"))); - } - - /** {@inheritDoc} */ - @Override public Path getInitialWorkingDirectory() { - File f = new File(System.getProperty("user.dir")); - - return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - - setConf(conf); - - String initWorkDir = conf.get(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP); - - if (initWorkDir != null) - setWorkingDirectory(new Path(initWorkDir)); - } - - /** {@inheritDoc} */ - @Override public URI getUri() { - return FsConstants.LOCAL_FS_URI; - } - - /** {@inheritDoc} */ - @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return new FSDataInputStream(new InStream(checkExists(convert(f)))); - } - - /** {@inheritDoc} */ - @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize, - short replication, long blockSize, Progressable progress) throws IOException { - File file = convert(f); - - if (!overwrite && !file.createNewFile()) - throw new IOException("Failed to create new file: " + f.toUri()); - - return out(file, false, bufSize); - } - - /** - * @param file File. - * @param append Append flag. - * @return Output stream. - * @throws IOException If failed. - */ - private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException { - return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append), - bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme())); - } - - /** {@inheritDoc} */ - @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { - return out(convert(f), true, bufSize); - } - - /** {@inheritDoc} */ - @Override public boolean rename(Path src, Path dst) throws IOException { - return convert(src).renameTo(convert(dst)); - } - - /** {@inheritDoc} */ - @Override public boolean delete(Path f, boolean recursive) throws IOException { - File file = convert(f); - - if (file.isDirectory() && !recursive) - throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri()); - - return U.delete(file); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path dir) { - workDir.set(fixRelativePart(dir)); - - checkPath(dir); - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workDir.get(); - } - - /** {@inheritDoc} */ - @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { - if(f == null) - throw new IllegalArgumentException("mkdirs path arg is null"); - - Path parent = f.getParent(); - - File p2f = convert(f); - - if(parent != null) { - File parent2f = convert(parent); - - if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) - throw new FileAlreadyExistsException("Parent path is not a directory: " + parent); - - } - - return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory()); - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileStatus(Path f) throws IOException { - return fileStatus(checkExists(convert(f))); - } - - /** - * @return File status. - */ - private FileStatus fileStatus(File file) throws IOException { - boolean dir = file.isDirectory(); - - java.nio.file.Path path = dir ? null : file.toPath(); - - return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(), - /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ? - new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI())); - } - - /** - * @param file File. - * @return Same file. - * @throws FileNotFoundException If does not exist. - */ - private static File checkExists(File file) throws FileNotFoundException { - if (!file.exists()) - throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist."); - - return file; - } - - /** {@inheritDoc} */ - @Override public FileStatus[] listStatus(Path f) throws IOException { - File file = convert(f); - - if (checkExists(file).isFile()) - return new FileStatus[] {fileStatus(file)}; - - File[] files = file.listFiles(); - - FileStatus[] res = new FileStatus[files.length]; - - for (int i = 0; i < res.length; i++) - res[i] = fileStatus(files[i]); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean supportsSymlinks() { - return true; - } - - /** {@inheritDoc} */ - @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException { - Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath()); - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileLinkStatus(Path f) throws IOException { - return getFileStatus(getLinkTarget(f)); - } - - /** {@inheritDoc} */ - @Override public Path getLinkTarget(Path f) throws IOException { - File file = Files.readSymbolicLink(convert(f).toPath()).toFile(); - - return new Path(file.toURI()); - } - - /** - * Input stream. - */ - private static class InStream extends InputStream implements Seekable, PositionedReadable { - /** */ - private final RandomAccessFile file; - - /** - * @param f File. - * @throws IOException If failed. - */ - public InStream(File f) throws IOException { - file = new RandomAccessFile(f, "r"); - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - return file.read(); - } - - /** {@inheritDoc} */ - @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - return file.read(b, off, len); - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - file.close(); - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { - long pos0 = file.getFilePointer(); - - file.seek(pos); - int res = file.read(buf, off, len); - - file.seek(pos0); - - return res; - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { - if (read(pos, buf, off, len) != len) - throw new IOException(); - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf) throws IOException { - readFully(pos, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - file.seek(pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() throws IOException { - return file.getFilePointer(); - } - - /** {@inheritDoc} */ - @Override public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java new file mode 100644 index 0000000..509f443 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.mapreduce.*; + +import java.io.*; +import java.net.*; + +import static org.apache.ignite.configuration.FileSystemConfiguration.*; + +/** + * Wrapper of HDFS for support of separated working directory. + */ +public class HadoopDistributedFileSystem extends DistributedFileSystem { + /** User name for each thread. */ + private final ThreadLocal<String> userName = new ThreadLocal<String>() { + /** {@inheritDoc} */ + @Override protected String initialValue() { + return DFLT_USER_NAME; + } + }; + + /** Working directory for each thread. */ + private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() { + /** {@inheritDoc} */ + @Override protected Path initialValue() { + return getHomeDirectory(); + } + }; + + /** {@inheritDoc} */ + @Override public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + + setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); + } + + /** + * Set user name and default working directory for current thread. + * + * @param userName User name. + */ + public void setUser(String userName) { + this.userName.set(userName); + + setWorkingDirectory(getHomeDirectory()); + } + + /** {@inheritDoc} */ + @Override public Path getHomeDirectory() { + Path path = new Path("/user/" + userName.get()); + + return path.makeQualified(getUri(), null); + } + + /** {@inheritDoc} */ + @Override public void setWorkingDirectory(Path dir) { + Path fixedDir = fixRelativePart(dir); + + String res = fixedDir.toUri().getPath(); + + if (!DFSUtil.isValidName(res)) + throw new IllegalArgumentException("Invalid DFS directory name " + res); + + workingDir.set(fixedDir); + } + + /** {@inheritDoc} */ + @Override public Path getWorkingDirectory() { + return workingDir.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java new file mode 100644 index 0000000..f3f51d4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.ignite.hadoop.fs.v1.*; + +/** + * Utilities for configuring file systems to support the separate working directory per each thread. + */ +public class HadoopFileSystemsUtils { + /** Name of the property for setting working directory on create new local FS instance. */ + public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; + + /** + * Set user name and default working directory for current thread if it's supported by file system. + * + * @param fs File system. + * @param userName User name. + */ + public static void setUser(FileSystem fs, String userName) { + if (fs instanceof IgniteHadoopFileSystem) + ((IgniteHadoopFileSystem)fs).setUser(userName); + else if (fs instanceof HadoopDistributedFileSystem) + ((HadoopDistributedFileSystem)fs).setUser(userName); + } + + /** + * Setup wrappers of filesystems to support the separate working directory. + * + * @param cfg Config for setup. + */ + public static void setupFileSystems(Configuration cfg) { + cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); + cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", + HadoopLocalFileSystemV2.class.getName()); + + cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java new file mode 100644 index 0000000..9cc5881 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.fs.*; + +import java.io.*; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV1 extends LocalFileSystem { + /** + * Creates new local file system. + */ + public HadoopLocalFileSystemV1() { + super(new HadoopRawLocalFileSystem()); + } + + /** {@inheritDoc} */ + @Override public File pathToFile(Path path) { + return ((HadoopRawLocalFileSystem)getRaw()).convert(path); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java new file mode 100644 index 0000000..15ddc5a --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.local.*; + +import java.io.*; +import java.net.*; + +import static org.apache.hadoop.fs.FsConstants.*; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV2 extends ChecksumFs { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException { + super(new DelegateFS(cfg)); + } + + /** + * Creates new local file system. + * + * @param uri URI. + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException { + this(cfg); + } + + /** + * Delegate file system. + */ + private static class DelegateFS extends DelegateToFileSystem { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public DelegateFS(Configuration cfg) throws IOException, URISyntaxException { + super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false); + } + + /** {@inheritDoc} */ + @Override public int getUriDefaultPort() { + return -1; + } + + /** {@inheritDoc} */ + @Override public FsServerDefaults getServerDefaults() throws IOException { + return LocalConfigKeys.getServerDefaults(); + } + + /** {@inheritDoc} */ + @Override public boolean isValidName(String src) { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java new file mode 100644 index 0000000..7edcec0 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +/** + * This class lists parameters that can be specified in Hadoop configuration. + * Hadoop configuration can be specified in {@code core-site.xml} file + * or passed to map-reduce task directly when using Hadoop driver for IGFS file system: + * <ul> + * <li> + * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides + * the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()} + * IGFS data node configuration property. + * </li> + * <li> + * {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If + * {@code true}, then all file system operations will be logged to a file. + * </li> + * <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li> + * <li> + * {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before + * it gets flushed to log file. Higher values will imply greater performance, but will increase delay + * before record appears in the log file. + * </li> + * <li> + * {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data + * node to which client is connected. If {@code true}, file will not be distributed and will be written + * to a single data node. Default value is {@code true}. + * </li> + * <li> + * {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to + * local data node if it has enough free space. After some time it can be redistributed across nodes though. + * </li> + * </ul> + * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in + * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}. + * <p> + * Sample configuration that can be placed to {@code core-site.xml} file: + * <pre name="code" class="xml"> + * <property> + * <name>fs.igfs.127.0.0.1:10500.log.enabled</name> + * <value>true</value> + * </property> + * <property> + * <name>fs.igfs.127.0.0.1:10500.log.dir</name> + * <value>/home/apache/ignite/log/sampling</value> + * </property> + * <property> + * <name>fs.igfs.127.0.0.1:10500.log.batch_size</name> + * <value>16</value> + * </property> + * </pre> + * Parameters could also be specified per mapreduce job, e.g. + * <pre name="code" class="bash"> + * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4 + * </pre> + * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest + * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}. + */ +public class HadoopParameters { + /** Parameter name for control over file colocation write mode. */ + public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes"; + + /** Parameter name for custom sequential reads before prefetch value. */ + public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH = + "fs.igfs.%s.open.sequential_reads_before_prefetch"; + + /** Parameter name for client logger directory. */ + public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir"; + + /** Parameter name for log batch size. */ + public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size"; + + /** Parameter name for log enabled flag. */ + public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled"; + + /** Parameter name for prefer local writes flag. */ + public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes"; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java new file mode 100644 index 0000000..e5ec3f7 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.nio.file.*; + +/** + * Local file system implementation for Hadoop. + */ +public class HadoopRawLocalFileSystem extends FileSystem { + /** Working directory for each thread. */ + private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() { + @Override protected Path initialValue() { + return getInitialWorkingDirectory(); + } + }; + + /** + * Converts Hadoop path to local path. + * + * @param path Hadoop path. + * @return Local path. + */ + File convert(Path path) { + checkPath(path); + + if (path.isAbsolute()) + return new File(path.toUri().getPath()); + + return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath()); + } + + /** {@inheritDoc} */ + @Override public Path getHomeDirectory() { + return makeQualified(new Path(System.getProperty("user.home"))); + } + + /** {@inheritDoc} */ + @Override public Path getInitialWorkingDirectory() { + File f = new File(System.getProperty("user.dir")); + + return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null); + } + + /** {@inheritDoc} */ + @Override public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + + setConf(conf); + + String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP); + + if (initWorkDir != null) + setWorkingDirectory(new Path(initWorkDir)); + } + + /** {@inheritDoc} */ + @Override public URI getUri() { + return FsConstants.LOCAL_FS_URI; + } + + /** {@inheritDoc} */ + @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return new FSDataInputStream(new InStream(checkExists(convert(f)))); + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize, + short replication, long blockSize, Progressable progress) throws IOException { + File file = convert(f); + + if (!overwrite && !file.createNewFile()) + throw new IOException("Failed to create new file: " + f.toUri()); + + return out(file, false, bufSize); + } + + /** + * @param file File. + * @param append Append flag. + * @return Output stream. + * @throws IOException If failed. + */ + private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException { + return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append), + bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme())); + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { + return out(convert(f), true, bufSize); + } + + /** {@inheritDoc} */ + @Override public boolean rename(Path src, Path dst) throws IOException { + return convert(src).renameTo(convert(dst)); + } + + /** {@inheritDoc} */ + @Override public boolean delete(Path f, boolean recursive) throws IOException { + File file = convert(f); + + if (file.isDirectory() && !recursive) + throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri()); + + return U.delete(file); + } + + /** {@inheritDoc} */ + @Override public void setWorkingDirectory(Path dir) { + workDir.set(fixRelativePart(dir)); + + checkPath(dir); + } + + /** {@inheritDoc} */ + @Override public Path getWorkingDirectory() { + return workDir.get(); + } + + /** {@inheritDoc} */ + @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { + if(f == null) + throw new IllegalArgumentException("mkdirs path arg is null"); + + Path parent = f.getParent(); + + File p2f = convert(f); + + if(parent != null) { + File parent2f = convert(parent); + + if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) + throw new FileAlreadyExistsException("Parent path is not a directory: " + parent); + + } + + return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory()); + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileStatus(Path f) throws IOException { + return fileStatus(checkExists(convert(f))); + } + + /** + * @return File status. + */ + private FileStatus fileStatus(File file) throws IOException { + boolean dir = file.isDirectory(); + + java.nio.file.Path path = dir ? null : file.toPath(); + + return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(), + /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ? + new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI())); + } + + /** + * @param file File. + * @return Same file. + * @throws FileNotFoundException If does not exist. + */ + private static File checkExists(File file) throws FileNotFoundException { + if (!file.exists()) + throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist."); + + return file; + } + + /** {@inheritDoc} */ + @Override public FileStatus[] listStatus(Path f) throws IOException { + File file = convert(f); + + if (checkExists(file).isFile()) + return new FileStatus[] {fileStatus(file)}; + + File[] files = file.listFiles(); + + FileStatus[] res = new FileStatus[files.length]; + + for (int i = 0; i < res.length; i++) + res[i] = fileStatus(files[i]); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean supportsSymlinks() { + return true; + } + + /** {@inheritDoc} */ + @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException { + Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath()); + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileLinkStatus(Path f) throws IOException { + return getFileStatus(getLinkTarget(f)); + } + + /** {@inheritDoc} */ + @Override public Path getLinkTarget(Path f) throws IOException { + File file = Files.readSymbolicLink(convert(f).toPath()).toFile(); + + return new Path(file.toURI()); + } + + /** + * Input stream. + */ + private static class InStream extends InputStream implements Seekable, PositionedReadable { + /** */ + private final RandomAccessFile file; + + /** + * @param f File. + * @throws IOException If failed. + */ + public InStream(File f) throws IOException { + file = new RandomAccessFile(f, "r"); + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + return file.read(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + return file.read(b, off, len); + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + file.close(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + long pos0 = file.getFilePointer(); + + file.seek(pos); + int res = file.read(buf, off, len); + + file.seek(pos0); + + return res; + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { + if (read(pos, buf, off, len) != len) + throw new IOException(); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf) throws IOException { + readFully(pos, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + file.seek(pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() throws IOException { + return file.getFilePointer(); + } + + /** {@inheritDoc} */ + @Override public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java new file mode 100644 index 0000000..b3cb235 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Facade for communication with grid. + */ +public interface HadoopIgfs { + /** + * Perform handshake. + * + * @param logDir Log directory. + * @return Future with handshake result. + * @throws IgniteCheckedException If failed. + */ + public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; + + /** + * Close connection. + * + * @param force Force flag. + */ + public void close(boolean force); + + /** + * Command to retrieve file info for some IGFS path. + * + * @param path Path to get file info for. + * @return Future for info operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to update file properties. + * + * @param path IGFS path to update properties. + * @param props Properties to update. + * @return Future for update operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Sets last access time and last modification time for a file. + * + * @param path Path to update times. + * @param accessTime Last access time to set. + * @param modificationTime Last modification time to set. + * @throws IgniteCheckedException If failed. + */ + public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, + IOException; + + /** + * Command to rename given path. + * + * @param src Source path. + * @param dest Destination path. + * @return Future for rename operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; + + /** + * Command to delete given path. + * + * @param path Path to delete. + * @param recursive {@code True} if deletion is recursive. + * @return Future for delete operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; + + /** + * Command to get affinity for given path, offset and length. + * + * @param path Path to get affinity for. + * @param start Start position (offset). + * @param len Data length. + * @return Future for affinity command. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, + IOException; + + /** + * Gets path summary. + * + * @param path Path to get summary for. + * @return Future that will be completed when summary is received. + * @throws IgniteCheckedException If failed. + */ + public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to create directories. + * + * @param path Path to create. + * @return Future for mkdirs operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Command to get list of files in directory. + * + * @param path Path to list. + * @return Future for listFiles operation. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to get directory listing. + * + * @param path Path to list. + * @return Future for listPaths operation. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Performs status request. + * + * @return Status response. + * @throws IgniteCheckedException If failed. + */ + public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, + IOException; + + /** + * Command to create file and open it for output. + * + * @param path Path to file. + * @param overwrite If {@code true} then old file contents will be lost. + * @param colocate If {@code true} and called on data node, file will be written on that node. + * @param replication Replication factor. + * @param props File properties for creation. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Open file for output appending data to the end of a file. + * + * @param path Path to file. + * @param create If {@code true}, file will be created if does not exist. + * @param props File properties. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java new file mode 100644 index 0000000..ff69478 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.*; + +/** + * Communication exception indicating a problem between file system and IGFS instance. + */ +public class HadoopIgfsCommunicationException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given throwable as a nested cause and + * source of error message. + * + * @param cause Non-null throwable cause. + */ + public HadoopIgfsCommunicationException(Exception cause) { + super(cause); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + */ + public HadoopIgfsCommunicationException(String msg) { + super(msg); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + * @param cause Cause. + */ + public HadoopIgfsCommunicationException(String msg, Exception cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java new file mode 100644 index 0000000..7502f57 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; + +import static org.apache.ignite.configuration.FileSystemConfiguration.*; + +/** + * IGFS endpoint abstraction. + */ +public class HadoopIgfsEndpoint { + /** Localhost. */ + public static final String LOCALHOST = "127.0.0.1"; + + /** IGFS name. */ + private final String igfsName; + + /** Grid name. */ + private final String gridName; + + /** Host. */ + private final String host; + + /** Port. */ + private final int port; + + /** + * Normalize IGFS URI. + * + * @param uri URI. + * @return Normalized URI. + * @throws IOException If failed. + */ + public static URI normalize(URI uri) throws IOException { + try { + if (!F.eq(IgniteFileSystem.IGFS_SCHEME, uri.getScheme())) + throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri); + + HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(uri.getAuthority()); + + StringBuilder sb = new StringBuilder(); + + if (endpoint.igfs() != null) + sb.append(endpoint.igfs()); + + if (endpoint.grid() != null) + sb.append(":").append(endpoint.grid()); + + return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(), + uri.getPath(), uri.getQuery(), uri.getFragment()); + } + catch (URISyntaxException | IgniteCheckedException e) { + throw new IOException("Failed to normalize URI: " + uri, e); + } + } + + /** + * Constructor. + * + * @param connStr Connection string. + * @throws IgniteCheckedException If failed to parse connection string. + */ + public HadoopIgfsEndpoint(@Nullable String connStr) throws IgniteCheckedException { + if (connStr == null) + connStr = ""; + + String[] tokens = connStr.split("@", -1); + + IgniteBiTuple<String, Integer> hostPort; + + if (tokens.length == 1) { + igfsName = null; + gridName = null; + + hostPort = hostPort(connStr, connStr); + } + else if (tokens.length == 2) { + String authStr = tokens[0]; + + if (authStr.isEmpty()) { + gridName = null; + igfsName = null; + } + else { + String[] authTokens = authStr.split(":", -1); + + igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0]; + + if (authTokens.length == 1) + gridName = null; + else if (authTokens.length == 2) + gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1]; + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + } + + hostPort = hostPort(connStr, tokens[1]); + } + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + + host = hostPort.get1(); + + assert hostPort.get2() != null; + + port = hostPort.get2(); + } + + /** + * Parse host and port. + * + * @param connStr Full connection string. + * @param hostPortStr Host/port connection string part. + * @return Tuple with host and port. + * @throws IgniteCheckedException If failed to parse connection string. + */ + private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException { + String[] tokens = hostPortStr.split(":", -1); + + String host = tokens[0]; + + if (F.isEmpty(host)) + host = LOCALHOST; + + int port; + + if (tokens.length == 1) + port = DFLT_IPC_PORT; + else if (tokens.length == 2) { + String portStr = tokens[1]; + + try { + port = Integer.valueOf(portStr); + + if (port < 0 || port > 65535) + throw new IgniteCheckedException("Invalid port number: " + connStr); + } + catch (NumberFormatException e) { + throw new IgniteCheckedException("Invalid port number: " + connStr); + } + } + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + + return F.t(host, port); + } + + /** + * @return IGFS name. + */ + @Nullable public String igfs() { + return igfsName; + } + + /** + * @return Grid name. + */ + @Nullable public String grid() { + return gridName; + } + + /** + * @return Host. + */ + public String host() { + return host; + } + + /** + * @return Host. + */ + public boolean isLocal() { + return F.eq(LOCALHOST, host); + } + + /** + * @return Port. + */ + public int port() { + return port; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopIgfsEndpoint.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java new file mode 100644 index 0000000..2200e78 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Extended IGFS server interface. + */ +public interface HadoopIgfsEx extends HadoopIgfs { + /** + * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. + * If connection is closed already, callback will be invoked synchronously inside this method. + * + * @param delegate Stream delegate. + * @param lsnr Event listener. + */ + public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr); + + /** + * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. + * + * @param delegate Stream delegate. + */ + public void removeEventListener(HadoopIgfsStreamDelegate delegate); + + /** + * Asynchronously reads specified amount of bytes from opened input stream. + * + * @param delegate Stream delegate. + * @param pos Position to read from. + * @param len Data length to read. + * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining + * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will + * be the result of read future. + * @param outOff Output offset. + * @param outLen Output length. + * @return Read data. + */ + public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, + @Nullable final byte[] outBuf, final int outOff, final int outLen); + + /** + * Writes data to the stream with given streamId. This method does not return any future since + * no response to write request is sent. + * + * @param delegate Stream delegate. + * @param data Data to write. + * @param off Offset. + * @param len Length. + * @throws IOException If failed. + */ + public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException; + + /** + * Close server stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * Flush output stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java new file mode 100644 index 0000000..59a8f49 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +/** + * IGFS client future that holds response parse closure. + */ +public class HadoopIgfsFuture<T> extends GridPlainFutureAdapter<T> { + /** Output buffer. */ + private byte[] outBuf; + + /** Output offset. */ + private int outOff; + + /** Output length. */ + private int outLen; + + /** Read future flag. */ + private boolean read; + + /** + * @return Output buffer. + */ + public byte[] outputBuffer() { + return outBuf; + } + + /** + * @param outBuf Output buffer. + */ + public void outputBuffer(@Nullable byte[] outBuf) { + this.outBuf = outBuf; + } + + /** + * @return Offset in output buffer to write from. + */ + public int outputOffset() { + return outOff; + } + + /** + * @param outOff Offset in output buffer to write from. + */ + public void outputOffset(int outOff) { + this.outOff = outOff; + } + + /** + * @return Length to write to output buffer. + */ + public int outputLength() { + return outLen; + } + + /** + * @param outLen Length to write to output buffer. + */ + public void outputLength(int outLen) { + this.outLen = outLen; + } + + /** + * @param read {@code True} if this is a read future. + */ + public void read(boolean read) { + this.read = read; + } + + /** + * @return {@code True} if this is a read future. + */ + public boolean read() { + return read; + } +}