http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java new file mode 100644 index 0000000..54e87dd --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.fs.permission.*; +import org.apache.ignite.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; + +/** + * Hadoop file system properties. + */ +public class HadoopIgfsProperties { + /** Username. */ + private String usrName; + + /** Group name. */ + private String grpName; + + /** Permissions. */ + private FsPermission perm; + + /** + * Constructor. + * + * @param props Properties. + * @throws IgniteException In case of error. + */ + public HadoopIgfsProperties(Map<String, String> props) throws IgniteException { + usrName = props.get(PROP_USER_NAME); + grpName = props.get(PROP_GROUP_NAME); + + String permStr = props.get(PROP_PERMISSION); + + if (permStr != null) { + try { + perm = new FsPermission((short)Integer.parseInt(permStr, 8)); + } + catch (NumberFormatException ignore) { + throw new IgniteException("Permissions cannot be parsed: " + permStr); + } + } + } + + /** + * Get user name. + * + * @return User name. + */ + public String userName() { + return usrName; + } + + /** + * Get group name. + * + * @return Group name. + */ + public String groupName() { + return grpName; + } + + /** + * Get permission. + * + * @return Permission. + */ + public FsPermission permission() { + return perm; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java new file mode 100644 index 0000000..4530e64 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.igfs.common.*; + +import java.io.*; + +/** + * Secondary Hadoop file system input stream wrapper. + */ +public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable { + /** Actual input stream to the secondary file system. */ + private final FSDataInputStream is; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long readTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of read bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param is Actual input stream to the secondary file system. + * @param clientLog Client log. + */ + public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) { + assert is != null; + assert clientLog != null; + + this.is = is; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b) throws IOException { + readStart(); + + int res; + + try { + res = is.read(b); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = super.read(b, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + readStart(); + + long res; + + try { + res = is.skip(n); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSkip(logStreamId, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + readStart(); + + try { + return is.available(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + readStart(); + + try { + is.close(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseIn(logStreamId, userTime, readTime, total); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void mark(int readLimit) { + readStart(); + + try { + is.mark(readLimit); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logMark(logStreamId, readLimit); + } + + /** {@inheritDoc} */ + @Override public synchronized void reset() throws IOException { + readStart(); + + try { + is.reset(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logReset(logStreamId); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean markSupported() { + readStart(); + + try { + return is.markSupported(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + readStart(); + + int res; + + try { + res = is.read(); + } + finally { + readEnd(); + } + + if (res != -1) + total++; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = is.read(pos, buf, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + try { + is.readFully(pos, buf, off, len); + } + finally { + readEnd(); + } + + total += len; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, len); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf) throws IOException { + readStart(); + + try { + is.readFully(pos, buf); + } + finally { + readEnd(); + } + + total += buf.length; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + readStart(); + + try { + is.seek(pos); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSeek(logStreamId, pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() throws IOException { + readStart(); + + try { + return is.getPos(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException { + readStart(); + + try { + return is.seekToNewSource(targetPos); + } + finally { + readEnd(); + } + } + + /** + * Read start. + */ + private void readStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void readEnd() { + long now = System.nanoTime(); + + readTime += now - lastTs; + + lastTs = now; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java new file mode 100644 index 0000000..9ab552e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.igfs.common.*; + +import java.io.*; + +/** + * Secondary Hadoop file system output stream wrapper. + */ +public class HadoopIgfsProxyOutputStream extends OutputStream { + /** Actual output stream. */ + private FSDataOutputStream os; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param os Actual output stream. + * @param clientLog Client logger. + * @param logStreamId Log stream ID. + */ + public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) { + assert os != null; + assert clientLog != null; + + this.os = os; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized void write(int b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total++; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total += b.length; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b, int off, int len) throws IOException { + writeStart(); + + try { + os.write(b, off, len); + } + finally { + writeEnd(); + } + + total += len; + } + + /** {@inheritDoc} */ + @Override public synchronized void flush() throws IOException { + writeStart(); + + try { + os.flush(); + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + writeStart(); + + try { + os.close(); + } + finally { + writeEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + } + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java new file mode 100644 index 0000000..e921fd3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.igfs.secondary.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly + * requested. + * <p> + * The class is expected to be used only from synchronized context and therefore is not tread-safe. + */ +public class HadoopIgfsSecondaryFileSystemPositionedReadable implements IgfsSecondaryFileSystemPositionedReadable { + /** Secondary file system. */ + private final FileSystem fs; + + /** Path to the file to open. */ + private final Path path; + + /** Buffer size. */ + private final int bufSize; + + /** Actual input stream. */ + private FSDataInputStream in; + + /** Cached error occurred during output stream open. */ + private IOException err; + + /** Flag indicating that the stream was already opened. */ + private boolean opened; + + /** + * Constructor. + * + * @param fs Secondary file system. + * @param path Path to the file to open. + * @param bufSize Buffer size. + */ + public HadoopIgfsSecondaryFileSystemPositionedReadable(FileSystem fs, Path path, int bufSize) { + assert fs != null; + assert path != null; + + this.fs = fs; + this.path = path; + this.bufSize = bufSize; + } + + /** Get input stream. */ + private PositionedReadable in() throws IOException { + if (opened) { + if (err != null) + throw err; + } + else { + opened = true; + + try { + in = fs.open(path, bufSize); + + if (in == null) + throw new IOException("Failed to open input stream (file system returned null): " + path); + } + catch (IOException e) { + err = e; + + throw err; + } + } + + return in; + } + + /** + * Close wrapped input stream in case it was previously opened. + */ + @Override public void close() { + U.closeQuiet(in); + } + + /** {@inheritDoc} */ + @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { + return in().read(pos, buf, off, len); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java new file mode 100644 index 0000000..54f7377 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * IGFS Hadoop stream descriptor. + */ +public class HadoopIgfsStreamDelegate { + /** RPC handler. */ + private final HadoopIgfsEx hadoop; + + /** Target. */ + private final Object target; + + /** Optional stream length. */ + private final long len; + + /** + * Constructor. + * + * @param target Target. + */ + public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) { + this(hadoop, target, -1); + } + + /** + * Constructor. + * + * @param target Target. + * @param len Optional length. + */ + public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) { + assert hadoop != null; + assert target != null; + + this.hadoop = hadoop; + this.target = target; + this.len = len; + } + + /** + * @return RPC handler. + */ + public HadoopIgfsEx hadoop() { + return hadoop; + } + + /** + * @return Stream target. + */ + @SuppressWarnings("unchecked") + public <T> T target() { + return (T) target; + } + + /** + * @return Length. + */ + public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return System.identityHashCode(target); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj != null && obj instanceof HadoopIgfsStreamDelegate && + target == ((HadoopIgfsStreamDelegate)obj).target; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopIgfsStreamDelegate.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java new file mode 100644 index 0000000..6b3fa82 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.*; + +/** + * IGFS input stream event listener. + */ +public interface HadoopIgfsStreamEventListener { + /** + * Callback invoked when the stream is being closed. + * + * @throws IgniteCheckedException If failed. + */ + public void onClose() throws IgniteCheckedException; + + /** + * Callback invoked when remote error occurs. + * + * @param errMsg Error message. + */ + public void onError(String errMsg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java new file mode 100644 index 0000000..e30a4ec --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Utility constants and methods for IGFS Hadoop file system. + */ +public class HadoopIgfsUtils { + /** Parameter name for endpoint no embed mode flag. */ + public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed"; + + /** Parameter name for endpoint no shared memory flag. */ + public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem"; + + /** Parameter name for endpoint no local TCP flag. */ + public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp"; + + /** + * Get string parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return String value. + */ + public static String parameter(Configuration cfg, String name, String authority, String dflt) { + return cfg.get(String.format(name, authority != null ? authority : ""), dflt); + } + + /** + * Get integer parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return Integer value. + * @throws IOException In case of parse exception. + */ + public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException { + String name0 = String.format(name, authority != null ? authority : ""); + + try { + return cfg.getInt(name0, dflt); + } + catch (NumberFormatException ignore) { + throw new IOException("Failed to parse parameter value to integer: " + name0); + } + } + + /** + * Get boolean parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return Boolean value. + */ + public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) { + return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt); + } + + /** + * Cast Ignite exception to appropriate IO exception. + * + * @param e Exception to cast. + * @return Casted exception. + */ + public static IOException cast(IgniteCheckedException e) { + return cast(e, null); + } + + /** + * Cast Ignite exception to appropriate IO exception. + * + * @param e Exception to cast. + * @param path Path for exceptions. + * @return Casted exception. + */ + @SuppressWarnings("unchecked") + public static IOException cast(IgniteCheckedException e, @Nullable String path) { + assert e != null; + + // First check for any nested IOException; if exists - re-throw it. + if (e.hasCause(IOException.class)) + return e.getCause(IOException.class); + else if (e.hasCause(IgfsFileNotFoundException.class)) + return new FileNotFoundException(path); // TODO: Or PathNotFoundException? + else if (e.hasCause(IgfsParentNotDirectoryException.class)) + return new ParentNotDirectoryException(path); + else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class)) + return new PathIsNotEmptyDirectoryException(path); + else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class)) + return new PathExistsException(path); + else + return new IOException(e); + } + + /** + * Constructor. + */ + private HadoopIgfsUtils() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java new file mode 100644 index 0000000..1dada21 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.commons.logging.*; +import org.apache.hadoop.conf.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.*; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*; + +/** + * Wrapper for IGFS server. + */ +public class HadoopIgfsWrapper implements HadoopIgfs { + /** Delegate. */ + private final AtomicReference<Delegate> delegateRef = new AtomicReference<>(); + + /** Authority. */ + private final String authority; + + /** Connection string. */ + private final HadoopIgfsEndpoint endpoint; + + /** Log directory. */ + private final String logDir; + + /** Configuration. */ + private final Configuration conf; + + /** Logger. */ + private final Log log; + + /** + * Constructor. + * + * @param authority Authority (connection string). + * @param logDir Log directory for server. + * @param conf Configuration. + * @param log Current logger. + */ + public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException { + try { + this.authority = authority; + this.endpoint = new HadoopIgfsEndpoint(authority); + this.logDir = logDir; + this.conf = conf; + this.log = log; + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to parse endpoint: " + authority, e); + } + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() { + @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) { + return hndResp; + } + }); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + Delegate delegate = delegateRef.get(); + + if (delegate != null && delegateRef.compareAndSet(delegate, null)) + delegate.close(force); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsFile>() { + @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.info(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsFile>() { + @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.update(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) + throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.setTimes(path, accessTime, modificationTime); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.rename(src, dest); + } + }, src); + } + + /** {@inheritDoc} */ + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.delete(path, recursive); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, + final long len) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() { + @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.affinity(path, start, len); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() { + @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.contentSummary(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.mkdirs(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() { + @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listFiles(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() { + @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listPaths(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsStatus>() { + @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.fsStatus(); + } + }); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) + throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path, seqReadsBeforePrefetch); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, + final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props) + throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.create(path, overwrite, colocate, replication, blockSize, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, + @Nullable final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.append(path, create, props); + } + }, path); + } + + /** + * Execute closure which is not path-specific. + * + * @param clo Closure. + * @return Result. + * @throws IOException If failed. + */ + private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException { + return withReconnectHandling(clo, null); + } + + /** + * Execute closure. + * + * @param clo Closure. + * @param path Path for exceptions. + * @return Result. + * @throws IOException If failed. + */ + private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path) + throws IOException { + Exception err = null; + + for (int i = 0; i < 2; i++) { + Delegate curDelegate = null; + + boolean close = false; + boolean force = false; + + try { + curDelegate = delegate(); + + assert curDelegate != null; + + close = curDelegate.doomed; + + return clo.apply(curDelegate.hadoop, curDelegate.hndResp); + } + catch (HadoopIgfsCommunicationException e) { + if (curDelegate != null && !curDelegate.doomed) { + // Try getting rid fo faulty delegate ASAP. + delegateRef.compareAndSet(curDelegate, null); + + close = true; + force = true; + } + + if (log.isDebugEnabled()) + log.debug("Failed to send message to a server: " + e); + + err = e; + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null); + } + finally { + if (close) { + assert curDelegate != null; + + curDelegate.close(force); + } + } + } + + throw new IOException("Failed to communicate with IGFS.", err); + } + + /** + * Get delegate creating it if needed. + * + * @return Delegate. + */ + private Delegate delegate() throws HadoopIgfsCommunicationException { + Exception err = null; + + // 1. If delegate is set, return it immediately. + Delegate curDelegate = delegateRef.get(); + + if (curDelegate != null) + return curDelegate; + + // 2. Guess that we are in the same VM. + if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false)) { + IgfsEx igfs = null; + + if (endpoint.grid() == null) { + try { + Ignite ignite = G.ignite(); + + igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs()); + } + catch (Exception e) { + err = e; + } + } + else { + for (Ignite ignite : G.allGrids()) { + try { + igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs()); + + break; + } + catch (Exception e) { + err = e; + } + } + } + + if (igfs != null) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsInProc(igfs, log); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e); + + err = e; + } + } + } + + // 3. Try connecting using shmem. + if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) { + if (curDelegate == null && !U.isWindows()) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to out-proc local IGFS using shmem.", e); + + err = e; + } + } + } + + // 4. Try local TCP connection. + boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); + + if (!skipLocTcp) { + if (curDelegate == null) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), + log); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to out-proc local IGFS using TCP.", e); + + err = e; + } + } + } + + // 5. Try remote TCP connection. + if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to out-proc remote IGFS using TCP.", e); + + err = e; + } + } + + if (curDelegate != null) { + if (!delegateRef.compareAndSet(null, curDelegate)) + curDelegate.doomed = true; + + return curDelegate; + } + else + throw new HadoopIgfsCommunicationException("Failed to connect to IGFS: " + endpoint, err); + } + + /** + * File system operation closure. + */ + private static interface FileSystemClosure<T> { + /** + * Call closure body. + * + * @param hadoop RPC handler. + * @param hndResp Handshake response. + * @return Result. + * @throws IgniteCheckedException If failed. + * @throws IOException If failed. + */ + public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; + } + + /** + * Delegate. + */ + private static class Delegate { + /** RPC handler. */ + private final HadoopIgfsEx hadoop; + + /** Handshake request. */ + private final IgfsHandshakeResponse hndResp; + + /** Close guard. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + /** Whether this delegate must be closed at the end of the next invocation. */ + private boolean doomed; + + /** + * Constructor. + * + * @param hadoop Hadoop. + * @param hndResp Handshake response. + */ + private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) { + this.hadoop = hadoop; + this.hndResp = hndResp; + } + + /** + * Close underlying RPC handler. + * + * @param force Force flag. + */ + private void close(boolean force) { + if (closeGuard.compareAndSet(false, true)) + hadoop.close(force); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java deleted file mode 100644 index b124312..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.jobtracker; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; - -/** - * Hadoop job metadata. Internal object used for distributed job state tracking. - */ -public class GridHadoopJobMetadata implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - private GridHadoopJobId jobId; - - /** Job info. */ - private GridHadoopJobInfo jobInfo; - - /** Node submitted job. */ - private UUID submitNodeId; - - /** Map-reduce plan. */ - private GridHadoopMapReducePlan mrPlan; - - /** Pending splits for which mapper should be executed. */ - private Map<GridHadoopInputSplit, Integer> pendingSplits; - - /** Pending reducers. */ - private Collection<Integer> pendingReducers; - - /** Reducers addresses. */ - @GridToStringInclude - private Map<Integer, GridHadoopProcessDescriptor> reducersAddrs; - - /** Job phase. */ - private GridHadoopJobPhase phase = PHASE_SETUP; - - /** Fail cause. */ - @GridToStringExclude - private Throwable failCause; - - /** Version. */ - private long ver; - - /** Job counters */ - private GridHadoopCounters counters = new GridHadoopCountersImpl(); - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridHadoopJobMetadata() { - // No-op. - } - - /** - * Constructor. - * - * @param submitNodeId Submit node ID. - * @param jobId Job ID. - * @param jobInfo Job info. - */ - public GridHadoopJobMetadata(UUID submitNodeId, GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { - this.jobId = jobId; - this.jobInfo = jobInfo; - this.submitNodeId = submitNodeId; - } - - /** - * Copy constructor. - * - * @param src Metadata to copy. - */ - public GridHadoopJobMetadata(GridHadoopJobMetadata src) { - // Make sure to preserve alphabetic order. - counters = src.counters; - failCause = src.failCause; - jobId = src.jobId; - jobInfo = src.jobInfo; - mrPlan = src.mrPlan; - pendingSplits = src.pendingSplits; - pendingReducers = src.pendingReducers; - phase = src.phase; - reducersAddrs = src.reducersAddrs; - submitNodeId = src.submitNodeId; - ver = src.ver + 1; - } - - /** - * @return Submit node ID. - */ - public UUID submitNodeId() { - return submitNodeId; - } - - /** - * @param phase Job phase. - */ - public void phase(GridHadoopJobPhase phase) { - this.phase = phase; - } - - /** - * @return Job phase. - */ - public GridHadoopJobPhase phase() { - return phase; - } - - /** - * Gets reducers addresses for external execution. - * - * @return Reducers addresses. - */ - public Map<Integer, GridHadoopProcessDescriptor> reducersAddresses() { - return reducersAddrs; - } - - /** - * Sets reducers addresses for external execution. - * - * @param reducersAddrs Map of addresses. - */ - public void reducersAddresses(Map<Integer, GridHadoopProcessDescriptor> reducersAddrs) { - this.reducersAddrs = reducersAddrs; - } - - /** - * Sets collection of pending splits. - * - * @param pendingSplits Collection of pending splits. - */ - public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) { - this.pendingSplits = pendingSplits; - } - - /** - * Gets collection of pending splits. - * - * @return Collection of pending splits. - */ - public Map<GridHadoopInputSplit, Integer> pendingSplits() { - return pendingSplits; - } - - /** - * Sets collection of pending reducers. - * - * @param pendingReducers Collection of pending reducers. - */ - public void pendingReducers(Collection<Integer> pendingReducers) { - this.pendingReducers = pendingReducers; - } - - /** - * Gets collection of pending reducers. - * - * @return Collection of pending reducers. - */ - public Collection<Integer> pendingReducers() { - return pendingReducers; - } - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @param mrPlan Map-reduce plan. - */ - public void mapReducePlan(GridHadoopMapReducePlan mrPlan) { - assert this.mrPlan == null : "Map-reduce plan can only be initialized once."; - - this.mrPlan = mrPlan; - } - - /** - * @return Map-reduce plan. - */ - public GridHadoopMapReducePlan mapReducePlan() { - return mrPlan; - } - - /** - * @return Job info. - */ - public GridHadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * Returns job counters. - * - * @return Collection of counters. - */ - public GridHadoopCounters counters() { - return counters; - } - - /** - * Sets counters. - * - * @param counters Collection of counters. - */ - public void counters(GridHadoopCounters counters) { - this.counters = counters; - } - - /** - * @param failCause Fail cause. - */ - public void failCause(Throwable failCause) { - assert failCause != null; - - if (this.failCause == null) // Keep the first error. - this.failCause = failCause; - } - - /** - * @return Fail cause. - */ - public Throwable failCause() { - return failCause; - } - - /** - * @return Version. - */ - public long version() { - return ver; - } - - /** - * @param split Split. - * @return Task number. - */ - public int taskNumber(GridHadoopInputSplit split) { - return pendingSplits.get(split); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, submitNodeId); - out.writeObject(jobId); - out.writeObject(jobInfo); - out.writeObject(mrPlan); - out.writeObject(pendingSplits); - out.writeObject(pendingReducers); - out.writeObject(phase); - out.writeObject(failCause); - out.writeLong(ver); - out.writeObject(reducersAddrs); - out.writeObject(counters); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - submitNodeId = U.readUuid(in); - jobId = (GridHadoopJobId)in.readObject(); - jobInfo = (GridHadoopJobInfo)in.readObject(); - mrPlan = (GridHadoopMapReducePlan)in.readObject(); - pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject(); - pendingReducers = (Collection<Integer>)in.readObject(); - phase = (GridHadoopJobPhase)in.readObject(); - failCause = (Throwable)in.readObject(); - ver = in.readLong(); - reducersAddrs = (Map<Integer, GridHadoopProcessDescriptor>)in.readObject(); - counters = (GridHadoopCounters)in.readObject(); - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(GridHadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(), - "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null : - failCause.getClass().getName()); - } -}