http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutProc.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutProc.java index 60b1db1..91e5670 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutProc.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutProc.java @@ -10,6 +10,7 @@ package org.gridgain.grid.kernal.ggfs.hadoop; import org.apache.commons.logging.*; +import org.apache.ignite.*; import org.apache.ignite.fs.*; import org.gridgain.grid.*; import org.gridgain.grid.kernal.ggfs.common.*; @@ -134,7 +135,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public GridGgfsHandshakeResponse handshake(String logDir) throws GridException { + @Override public GridGgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException { final GridGgfsHandshakeRequest req = new GridGgfsHandshakeRequest(); req.gridName(grid); @@ -157,7 +158,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public IgniteFsFile info(IgniteFsPath path) throws GridException { + @Override public IgniteFsFile info(IgniteFsPath path) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(INFO); @@ -167,7 +168,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public IgniteFsFile update(IgniteFsPath path, Map<String, String> props) throws GridException { + @Override public IgniteFsFile update(IgniteFsPath path, Map<String, String> props) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(UPDATE); @@ -178,7 +179,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public Boolean setTimes(IgniteFsPath path, long accessTime, long modificationTime) throws GridException { + @Override public Boolean setTimes(IgniteFsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(SET_TIMES); @@ -190,7 +191,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public Boolean rename(IgniteFsPath src, IgniteFsPath dest) throws GridException { + @Override public Boolean rename(IgniteFsPath src, IgniteFsPath dest) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(RENAME); @@ -201,7 +202,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public Boolean delete(IgniteFsPath path, boolean recursive) throws GridException { + @Override public Boolean delete(IgniteFsPath path, boolean recursive) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(DELETE); @@ -213,7 +214,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp /** {@inheritDoc} */ @Override public Collection<IgniteFsBlockLocation> affinity(IgniteFsPath path, long start, long len) - throws GridException { + throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(AFFINITY); @@ -225,7 +226,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public IgniteFsPathSummary contentSummary(IgniteFsPath path) throws GridException { + @Override public IgniteFsPathSummary contentSummary(IgniteFsPath path) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(PATH_SUMMARY); @@ -235,7 +236,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgniteFsPath path, Map<String, String> props) throws GridException { + @Override public Boolean mkdirs(IgniteFsPath path, Map<String, String> props) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(MAKE_DIRECTORIES); @@ -246,7 +247,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public Collection<IgniteFsFile> listFiles(IgniteFsPath path) throws GridException { + @Override public Collection<IgniteFsFile> listFiles(IgniteFsPath path) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(LIST_FILES); @@ -256,7 +257,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public Collection<IgniteFsPath> listPaths(IgniteFsPath path) throws GridException { + @Override public Collection<IgniteFsPath> listPaths(IgniteFsPath path) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(LIST_PATHS); @@ -266,12 +267,12 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp } /** {@inheritDoc} */ - @Override public GridGgfsStatus fsStatus() throws GridException { + @Override public GridGgfsStatus fsStatus() throws IgniteCheckedException { return io.send(new GridGgfsStatusRequest()).chain(STATUS_RES).get(); } /** {@inheritDoc} */ - @Override public GridGgfsHadoopStreamDelegate open(IgniteFsPath path) throws GridException { + @Override public GridGgfsHadoopStreamDelegate open(IgniteFsPath path) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(OPEN_READ); @@ -285,7 +286,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp /** {@inheritDoc} */ @Override public GridGgfsHadoopStreamDelegate open(IgniteFsPath path, - int seqReadsBeforePrefetch) throws GridException { + int seqReadsBeforePrefetch) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(OPEN_READ); @@ -300,7 +301,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp /** {@inheritDoc} */ @Override public GridGgfsHadoopStreamDelegate create(IgniteFsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws GridException { + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(OPEN_CREATE); @@ -318,7 +319,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp /** {@inheritDoc} */ @Override public GridGgfsHadoopStreamDelegate append(IgniteFsPath path, boolean create, - @Nullable Map<String, String> props) throws GridException { + @Nullable Map<String, String> props) throws IgniteCheckedException { final GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest(); msg.command(OPEN_APPEND); @@ -346,7 +347,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp try { return io.send(msg, outBuf, outOff, outLen); } - catch (GridException e) { + catch (IgniteCheckedException e) { return new GridPlainFutureAdapter<>(e); } } @@ -365,7 +366,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp try { io.sendPlain(msg); } - catch (GridException e) { + catch (IgniteCheckedException e) { throw GridGgfsHadoopUtils.cast(e); } } @@ -385,7 +386,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp try { io.send(msg).chain(BOOL_RES).get(); } - catch (GridException e) { + catch (IgniteCheckedException e) { throw GridGgfsHadoopUtils.cast(e); } } @@ -419,7 +420,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp try { lsnr.onClose(); } - catch (GridException e) { + catch (IgniteCheckedException e) { log.warn("Got exception from stream event listener (will ignore): " + lsnr, e); } } @@ -445,7 +446,7 @@ public class GridGgfsHadoopOutProc implements GridGgfsHadoopEx, GridGgfsHadoopIp @SuppressWarnings("unchecked") private static <T> GridPlainClosure<GridPlainFuture<GridGgfsMessage>, T> createClosure() { return new GridPlainClosure<GridPlainFuture<GridGgfsMessage>, T>() { - @Override public T apply(GridPlainFuture<GridGgfsMessage> fut) throws GridException { + @Override public T apply(GridPlainFuture<GridGgfsMessage> fut) throws IgniteCheckedException { GridGgfsControlResponse res = (GridGgfsControlResponse)fut.get(); if (res.hasError())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutputStream.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutputStream.java index 44b35aa..5f91bbb 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutputStream.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopOutputStream.java @@ -10,6 +10,7 @@ package org.gridgain.grid.kernal.ggfs.hadoop; import org.apache.commons.logging.*; +import org.apache.ignite.*; import org.gridgain.grid.*; import org.gridgain.grid.kernal.ggfs.common.*; import org.jetbrains.annotations.*; @@ -182,7 +183,7 @@ public class GridGgfsHadoopOutputStream extends OutputStream implements GridGgfs } /** {@inheritDoc} */ - @Override public void onClose() throws GridException { + @Override public void onClose() throws IgniteCheckedException { markClosed(true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopStreamEventListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopStreamEventListener.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopStreamEventListener.java index 6bbb5a0..7af3c51 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopStreamEventListener.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopStreamEventListener.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.ggfs.hadoop; +import org.apache.ignite.*; import org.gridgain.grid.*; /** @@ -18,9 +19,9 @@ public interface GridGgfsHadoopStreamEventListener { /** * Callback invoked when the stream is being closed. * - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - public void onClose() throws GridException; + public void onClose() throws IgniteCheckedException; /** * Callback invoked when remote error occurs. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopUtils.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopUtils.java index e81b17d..dcb404a 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopUtils.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopUtils.java @@ -11,6 +11,7 @@ package org.gridgain.grid.kernal.ggfs.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; +import org.apache.ignite.*; import org.apache.ignite.fs.*; import org.gridgain.grid.*; import org.gridgain.grid.kernal.processors.ggfs.*; @@ -84,7 +85,7 @@ public class GridGgfsHadoopUtils { * @param e Exception to cast. * @return Casted exception. */ - public static IOException cast(GridException e) { + public static IOException cast(IgniteCheckedException e) { return cast(e, null); } @@ -96,7 +97,7 @@ public class GridGgfsHadoopUtils { * @return Casted exception. */ @SuppressWarnings("unchecked") - public static IOException cast(GridException e, @Nullable String path) { + public static IOException cast(IgniteCheckedException e, @Nullable String path) { assert e != null; // First check for any nested IOException; if exists - re-throw it. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopWrapper.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopWrapper.java index 05fc3a1..a393a04 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopWrapper.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/ggfs/hadoop/GridGgfsHadoopWrapper.java @@ -64,7 +64,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { this.conf = conf; this.log = log; } - catch (GridException e) { + catch (IgniteCheckedException e) { throw new IOException("Failed to parse endpoint: " + authority, e); } } @@ -73,7 +73,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public GridGgfsHandshakeResponse handshake(String logDir) throws IOException { return withReconnectHandling(new FileSystemClosure<GridGgfsHandshakeResponse>() { @Override public GridGgfsHandshakeResponse apply(GridGgfsHadoopEx hadoop, - GridGgfsHandshakeResponse hndResp) throws GridException, IOException { + GridGgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { return hndResp; } }); @@ -91,7 +91,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public IgniteFsFile info(final IgniteFsPath path) throws IOException { return withReconnectHandling(new FileSystemClosure<IgniteFsFile>() { @Override public IgniteFsFile apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) - throws GridException, IOException { + throws IgniteCheckedException, IOException { return hadoop.info(path); } }, path); @@ -101,7 +101,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public IgniteFsFile update(final IgniteFsPath path, final Map<String, String> props) throws IOException { return withReconnectHandling(new FileSystemClosure<IgniteFsFile>() { @Override public IgniteFsFile apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) - throws GridException, IOException { + throws IgniteCheckedException, IOException { return hadoop.update(path, props); } }, path); @@ -112,7 +112,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { throws IOException { return withReconnectHandling(new FileSystemClosure<Boolean>() { @Override public Boolean apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) - throws GridException, IOException { + throws IgniteCheckedException, IOException { return hadoop.setTimes(path, accessTime, modificationTime); } }, path); @@ -122,7 +122,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public Boolean rename(final IgniteFsPath src, final IgniteFsPath dest) throws IOException { return withReconnectHandling(new FileSystemClosure<Boolean>() { @Override public Boolean apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) - throws GridException, IOException { + throws IgniteCheckedException, IOException { return hadoop.rename(src, dest); } }, src); @@ -132,7 +132,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public Boolean delete(final IgniteFsPath path, final boolean recursive) throws IOException { return withReconnectHandling(new FileSystemClosure<Boolean>() { @Override public Boolean apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) - throws GridException, IOException { + throws IgniteCheckedException, IOException { return hadoop.delete(path, recursive); } }, path); @@ -143,7 +143,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { final long len) throws IOException { return withReconnectHandling(new FileSystemClosure<Collection<IgniteFsBlockLocation>>() { @Override public Collection<IgniteFsBlockLocation> apply(GridGgfsHadoopEx hadoop, - GridGgfsHandshakeResponse hndResp) throws GridException, IOException { + GridGgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { return hadoop.affinity(path, start, len); } }, path); @@ -153,7 +153,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public IgniteFsPathSummary contentSummary(final IgniteFsPath path) throws IOException { return withReconnectHandling(new FileSystemClosure<IgniteFsPathSummary>() { @Override public IgniteFsPathSummary apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) - throws GridException, IOException { + throws IgniteCheckedException, IOException { return hadoop.contentSummary(path); } }, path); @@ -163,7 +163,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public Boolean mkdirs(final IgniteFsPath path, final Map<String, String> props) throws IOException { return withReconnectHandling(new FileSystemClosure<Boolean>() { @Override public Boolean apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) - throws GridException, IOException { + throws IgniteCheckedException, IOException { return hadoop.mkdirs(path, props); } }, path); @@ -173,7 +173,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public Collection<IgniteFsFile> listFiles(final IgniteFsPath path) throws IOException { return withReconnectHandling(new FileSystemClosure<Collection<IgniteFsFile>>() { @Override public Collection<IgniteFsFile> apply(GridGgfsHadoopEx hadoop, - GridGgfsHandshakeResponse hndResp) throws GridException, IOException { + GridGgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { return hadoop.listFiles(path); } }, path); @@ -183,7 +183,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public Collection<IgniteFsPath> listPaths(final IgniteFsPath path) throws IOException { return withReconnectHandling(new FileSystemClosure<Collection<IgniteFsPath>>() { @Override public Collection<IgniteFsPath> apply(GridGgfsHadoopEx hadoop, - GridGgfsHandshakeResponse hndResp) throws GridException, IOException { + GridGgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { return hadoop.listPaths(path); } }, path); @@ -193,7 +193,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public GridGgfsStatus fsStatus() throws IOException { return withReconnectHandling(new FileSystemClosure<GridGgfsStatus>() { @Override public GridGgfsStatus apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) - throws GridException, IOException { + throws IgniteCheckedException, IOException { return hadoop.fsStatus(); } }); @@ -203,7 +203,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Override public GridGgfsHadoopStreamDelegate open(final IgniteFsPath path) throws IOException { return withReconnectHandling(new FileSystemClosure<GridGgfsHadoopStreamDelegate>() { @Override public GridGgfsHadoopStreamDelegate apply(GridGgfsHadoopEx hadoop, - GridGgfsHandshakeResponse hndResp) throws GridException, IOException { + GridGgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { return hadoop.open(path); } }, path); @@ -214,7 +214,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { throws IOException { return withReconnectHandling(new FileSystemClosure<GridGgfsHadoopStreamDelegate>() { @Override public GridGgfsHadoopStreamDelegate apply(GridGgfsHadoopEx hadoop, - GridGgfsHandshakeResponse hndResp) throws GridException, IOException { + GridGgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { return hadoop.open(path, seqReadsBeforePrefetch); } }, path); @@ -226,7 +226,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { throws IOException { return withReconnectHandling(new FileSystemClosure<GridGgfsHadoopStreamDelegate>() { @Override public GridGgfsHadoopStreamDelegate apply(GridGgfsHadoopEx hadoop, - GridGgfsHandshakeResponse hndResp) throws GridException, IOException { + GridGgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { return hadoop.create(path, overwrite, colocate, replication, blockSize, props); } }, path); @@ -237,7 +237,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { @Nullable final Map<String, String> props) throws IOException { return withReconnectHandling(new FileSystemClosure<GridGgfsHadoopStreamDelegate>() { @Override public GridGgfsHadoopStreamDelegate apply(GridGgfsHadoopEx hadoop, - GridGgfsHandshakeResponse hndResp) throws GridException, IOException { + GridGgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { return hadoop.append(path, create, props); } }, path); @@ -295,7 +295,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { err = e; } - catch (GridException e) { + catch (IgniteCheckedException e) { throw GridGgfsHadoopUtils.cast(e, path != null ? path.toString() : null); } finally { @@ -359,7 +359,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } - catch (IOException | GridException e) { + catch (IOException | IgniteCheckedException e) { if (e instanceof GridGgfsHadoopCommunicationException) hadoop.close(true); @@ -381,7 +381,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } - catch (IOException | GridException e) { + catch (IOException | IgniteCheckedException e) { if (e instanceof GridGgfsHadoopCommunicationException) hadoop.close(true); @@ -406,7 +406,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } - catch (IOException | GridException e) { + catch (IOException | IgniteCheckedException e) { if (e instanceof GridGgfsHadoopCommunicationException) hadoop.close(true); @@ -427,7 +427,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } - catch (IOException | GridException e) { + catch (IOException | IgniteCheckedException e) { if (e instanceof GridGgfsHadoopCommunicationException) hadoop.close(true); @@ -458,10 +458,10 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { * @param hadoop RPC handler. * @param hndResp Handshake response. * @return Result. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. * @throws IOException If failed. */ - public T apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException; + public T apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopClassLoader.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopClassLoader.java index 826eb37..f530058 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopClassLoader.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop; +import org.apache.ignite.*; import org.gridgain.grid.*; import org.gridgain.grid.kernal.processors.hadoop.v2.*; import org.gridgain.grid.util.typedef.*; @@ -468,7 +469,7 @@ public class GridHadoopClassLoader extends URLClassLoader { try { hadoopJars = hadoopUrls(); } - catch (GridException e) { + catch (IgniteCheckedException e) { throw new RuntimeException(e); } @@ -492,9 +493,9 @@ public class GridHadoopClassLoader extends URLClassLoader { /** * @return Collection of jar URLs. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - public static Collection<URL> hadoopUrls() throws GridException { + public static Collection<URL> hadoopUrls() throws IgniteCheckedException { Collection<URL> hadoopUrls = hadoopJars; if (hadoopUrls != null) @@ -511,7 +512,7 @@ public class GridHadoopClassLoader extends URLClassLoader { String hadoopPrefix = hadoopHome(); if (F.isEmpty(hadoopPrefix)) - throw new GridException("Failed resolve Hadoop installation location. Either HADOOP_PREFIX or " + + throw new IgniteCheckedException("Failed resolve Hadoop installation location. Either HADOOP_PREFIX or " + "HADOOP_HOME environment variables must be set."); String commonHome = getEnv("HADOOP_COMMON_HOME", hadoopPrefix + "/share/hadoop/common"); @@ -533,7 +534,7 @@ public class GridHadoopClassLoader extends URLClassLoader { addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-core"); } catch (Exception e) { - throw new GridException(e); + throw new IgniteCheckedException(e); } hadoopJars = hadoopUrls; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopComponent.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopComponent.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopComponent.java index 15e279a..c44ae02 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopComponent.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopComponent.java @@ -25,7 +25,7 @@ public abstract class GridHadoopComponent { /** * @param ctx Hadoop context. */ - public void start(GridHadoopContext ctx) throws GridException { + public void start(GridHadoopContext ctx) throws IgniteCheckedException { this.ctx = ctx; log = ctx.kernalContext().log(getClass()); @@ -41,7 +41,7 @@ public abstract class GridHadoopComponent { /** * Callback invoked when all grid components are started. */ - public void onKernalStart() throws GridException { + public void onKernalStart() throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopImpl.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopImpl.java index c95c95a..a3504f4 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopImpl.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopImpl.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop; +import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; @@ -68,7 +69,7 @@ public class GridHadoopImpl implements GridHadoop { } /** {@inheritDoc} */ - @Nullable @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws GridException { + @Nullable @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { return proc.status(jobId); @@ -82,7 +83,7 @@ public class GridHadoopImpl implements GridHadoop { } /** {@inheritDoc} */ - @Nullable @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws GridException { + @Nullable @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { return proc.counters(jobId); @@ -96,7 +97,7 @@ public class GridHadoopImpl implements GridHadoop { } /** {@inheritDoc} */ - @Nullable @Override public IgniteFuture<?> finishFuture(GridHadoopJobId jobId) throws GridException { + @Nullable @Override public IgniteFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { return proc.finishFuture(jobId); @@ -110,7 +111,7 @@ public class GridHadoopImpl implements GridHadoop { } /** {@inheritDoc} */ - @Override public boolean kill(GridHadoopJobId jobId) throws GridException { + @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { return proc.kill(jobId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopProcessor.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopProcessor.java index 8634e15..b1ec6db 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopProcessor.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopProcessor.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop; +import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; @@ -49,7 +50,7 @@ public class GridHadoopProcessor extends GridHadoopProcessorAdapter { } /** {@inheritDoc} */ - @Override public void start() throws GridException { + @Override public void start() throws IgniteCheckedException { if (ctx.isDaemon()) return; @@ -74,7 +75,7 @@ public class GridHadoopProcessor extends GridHadoopProcessorAdapter { ok = true; } - catch (GridException e) { + catch (IgniteCheckedException e) { U.quietAndWarn(log, e.getMessage()); } @@ -100,7 +101,7 @@ public class GridHadoopProcessor extends GridHadoopProcessorAdapter { } /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws GridException { + @Override public void stop(boolean cancel) throws IgniteCheckedException { super.stop(cancel); if (hctx == null) @@ -116,7 +117,7 @@ public class GridHadoopProcessor extends GridHadoopProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStart() throws GridException { + @Override public void onKernalStart() throws IgniteCheckedException { super.onKernalStart(); if (hctx == null) @@ -176,22 +177,22 @@ public class GridHadoopProcessor extends GridHadoopProcessorAdapter { } /** {@inheritDoc} */ - @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws GridException { + @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { return hctx.jobTracker().status(jobId); } /** {@inheritDoc} */ - @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws GridException { + @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { return hctx.jobTracker().jobCounters(jobId); } /** {@inheritDoc} */ - @Override public IgniteFuture<?> finishFuture(GridHadoopJobId jobId) throws GridException { + @Override public IgniteFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { return hctx.jobTracker().finishFuture(jobId); } /** {@inheritDoc} */ - @Override public boolean kill(GridHadoopJobId jobId) throws GridException { + @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { return hctx.jobTracker().killJob(jobId); } @@ -209,11 +210,11 @@ public class GridHadoopProcessor extends GridHadoopProcessorAdapter { * Validates Grid and Hadoop configuration for correctness. * * @param hadoopCfg Hadoop configuration. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - private void validate(GridHadoopConfiguration hadoopCfg) throws GridException { + private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException { if (ctx.config().isPeerClassLoadingEnabled()) - throw new GridException("Peer class loading cannot be used with Hadoop (disable it using " + + throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " + "GridConfiguration.setPeerClassLoadingEnabled())."); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopTaskCancelledException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopTaskCancelledException.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopTaskCancelledException.java index f7c6804..bf27009 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopTaskCancelledException.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopTaskCancelledException.java @@ -9,12 +9,13 @@ package org.gridgain.grid.kernal.processors.hadoop; +import org.apache.ignite.*; import org.gridgain.grid.*; /** * Exception that throws when the task is cancelling. */ -public class GridHadoopTaskCancelledException extends GridRuntimeException { +public class GridHadoopTaskCancelledException extends IgniteException { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopUtils.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopUtils.java index b031f98..502e500 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopUtils.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopUtils.java @@ -14,6 +14,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; import org.gridgain.grid.kernal.processors.hadoop.v2.*; @@ -176,11 +177,11 @@ public class GridHadoopUtils { * * @param attr Attribute name. * @param msg Message for creation of exception. - * @throws org.gridgain.grid.GridException If attribute is set. + * @throws IgniteCheckedException If attribute is set. */ - public static void ensureNotSet(Configuration cfg, String attr, String msg) throws GridException { + public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException { if (cfg.get(attr) != null) - throw new GridException(attr + " is incompatible with " + msg + " mode."); + throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode."); } /** @@ -188,9 +189,9 @@ public class GridHadoopUtils { * * @param cfg Hadoop configuration. * @return Job info. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - public static GridHadoopDefaultJobInfo createJobInfo(Configuration cfg) throws GridException { + public static GridHadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException { JobConf jobConf = new JobConf(cfg); boolean hasCombiner = jobConf.get("mapred.combiner.class") != null @@ -249,18 +250,18 @@ public class GridHadoopUtils { } /** - * Throws new {@link GridException} with original exception is serialized into string. + * Throws new {@link IgniteCheckedException} with original exception is serialized into string. * This is needed to transfer error outside the current class loader. * * @param e Original exception. - * @return GridException New exception. + * @return IgniteCheckedException New exception. */ - public static GridException transformException(Throwable e) { + public static IgniteCheckedException transformException(Throwable e) { ByteArrayOutputStream os = new ByteArrayOutputStream(); e.printStackTrace(new PrintStream(os, true)); - return new GridException(os.toString()); + return new IgniteCheckedException(os.toString()); } /** @@ -269,9 +270,9 @@ public class GridHadoopUtils { * @param locNodeId Local node ID. * @param jobId Job ID. * @return Working directory for job. - * @throws GridException If Failed. + * @throws IgniteCheckedException If Failed. */ - public static File jobLocalDir(UUID locNodeId, GridHadoopJobId jobId) throws GridException { + public static File jobLocalDir(UUID locNodeId, GridHadoopJobId jobId) throws IgniteCheckedException { return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId); } @@ -281,9 +282,9 @@ public class GridHadoopUtils { * @param locNodeId Local node ID. * @param info Task info. * @return Working directory for task. - * @throws GridException If Failed. + * @throws IgniteCheckedException If Failed. */ - public static File taskLocalDir(UUID locNodeId, GridHadoopTaskInfo info) throws GridException { + public static File taskLocalDir(UUID locNodeId, GridHadoopTaskInfo info) throws IgniteCheckedException { File jobLocDir = jobLocalDir(locNodeId, info.jobId()); return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopCountersImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopCountersImpl.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopCountersImpl.java index 24dbdcc..988674e 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopCountersImpl.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopCountersImpl.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop.counter; +import org.apache.ignite.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; import org.gridgain.grid.util.lang.*; @@ -71,7 +72,7 @@ public class GridHadoopCountersImpl implements GridHadoopCounters, Externalizabl return (T)constructor.newInstance(grp, name); } catch (Exception e) { - throw new GridRuntimeException(e); + throw new IgniteException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java index 8facee1..5b428f5 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java @@ -11,6 +11,7 @@ package org.gridgain.grid.kernal.processors.hadoop.counter; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; +import org.apache.ignite.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; import org.gridgain.grid.util.typedef.*; @@ -39,7 +40,7 @@ public class GridHadoopFSCounterWriter implements GridHadoopCounterWriter { /** {@inheritDoc} */ @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) - throws GridException { + throws IgniteCheckedException { Configuration hadoopCfg = new Configuration(); @@ -76,7 +77,7 @@ public class GridHadoopFSCounterWriter implements GridHadoopCounterWriter { } } catch (IOException e) { - throw new GridException(e); + throw new IgniteCheckedException(e); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java index cefa7a6..c2ee568 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop.jobtracker; +import org.apache.ignite.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; @@ -71,14 +72,14 @@ public class GridHadoopJobTracker extends GridHadoopComponent { try { gridFut.get(); } - catch (GridException e) { + catch (IgniteCheckedException e) { U.error(log, "Failed to transform system cache.", e); } } }; /** {@inheritDoc} */ - @Override public void start(GridHadoopContext ctx) throws GridException { + @Override public void start(GridHadoopContext ctx) throws IgniteCheckedException { super.start(ctx); busyLock = new GridSpinReadWriteLock(); @@ -106,7 +107,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { try { ctx.kernalContext().resource().injectGeneric(mrPlanner); } - catch (GridException e) { // Must not happen. + catch (IgniteCheckedException e) { // Must not happen. U.error(log, "Failed to inject resources.", e); throw new IllegalStateException(e); @@ -122,7 +123,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** {@inheritDoc} */ @SuppressWarnings("deprecation") - @Override public void onKernalStart() throws GridException { + @Override public void onKernalStart() throws IgniteCheckedException { super.onKernalStart(); GridCacheContinuousQuery<GridHadoopJobId, GridHadoopJobMetadata> qry = jobMetaCache().queries().createContinuousQuery(); @@ -137,7 +138,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { try { // Must process query callback in a separate thread to avoid deadlocks. evtProcSvc.submit(new EventHandler() { - @Override protected void body() throws GridException { + @Override protected void body() throws IgniteCheckedException { processJobMetadataUpdates(evts); } }); @@ -182,7 +183,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { // Fail all pending futures. for (GridFutureAdapter<GridHadoopJobId> fut : activeFinishFuts.values()) - fut.onDone(new GridException("Failed to execute Hadoop map-reduce job (grid is stopping).")); + fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping).")); } /** @@ -195,7 +196,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { @SuppressWarnings("unchecked") public IgniteFuture<GridHadoopJobId> submit(GridHadoopJobId jobId, GridHadoopJobInfo info) { if (!busyLock.tryReadLock()) { - return new GridFinishedFutureEx<>(new GridException("Failed to execute map-reduce job " + + return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to execute map-reduce job " + "(grid is stopping): " + info)); } @@ -203,7 +204,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { long jobPrepare = U.currentTimeMillis(); if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId)) - throw new GridException("Failed to submit job. Job with the same ID already exists: " + jobId); + throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); GridHadoopJob job = job(jobId, info); @@ -235,11 +236,11 @@ public class GridHadoopJobTracker extends GridHadoopComponent { perfCntr.onJobStart(jobStart); if (jobMetaCache().putIfAbsent(jobId, meta) != null) - throw new GridException("Failed to submit job. Job with the same ID already exists: " + jobId); + throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); return completeFut; } - catch (GridException e) { + catch (IgniteCheckedException e) { U.error(log, "Failed to submit job: " + jobId, e); return new GridFinishedFutureEx<>(e); @@ -279,7 +280,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param jobId Job ID to get status for. * @return Job status for given job ID or {@code null} if job was not found. */ - @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws GridException { + @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return null; // Grid is stopping. @@ -298,9 +299,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * * @param jobId Job ID. * @return Finish future or {@code null}. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteFuture<?> finishFuture(GridHadoopJobId jobId) throws GridException { + @Nullable public IgniteFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return null; // Grid is stopping. @@ -352,9 +353,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * * @param jobId Job ID. * @return Job plan. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - public GridHadoopMapReducePlan plan(GridHadoopJobId jobId) throws GridException { + public GridHadoopMapReducePlan plan(GridHadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return null; @@ -569,12 +570,12 @@ public class GridHadoopJobTracker extends GridHadoopComponent { } if (cancelSplits != null || cancelReducers != null) - jobMetaCache().transform(meta.jobId(), new CancelJobClosure(null, new GridException( + jobMetaCache().transform(meta.jobId(), new CancelJobClosure(null, new IgniteCheckedException( "One or more nodes participating in map-reduce job execution failed."), cancelSplits, cancelReducers)); } } - catch (GridException e) { + catch (IgniteCheckedException e) { U.error(log, "Failed to cancel job: " + meta, e); } } @@ -584,7 +585,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * @param updated Updated cache entries. */ - private void processJobMetadataUpdates(Iterable<Map.Entry<GridHadoopJobId, GridHadoopJobMetadata>> updated) throws GridException { + private void processJobMetadataUpdates(Iterable<Map.Entry<GridHadoopJobId, GridHadoopJobMetadata>> updated) throws IgniteCheckedException { UUID locNodeId = ctx.localNodeId(); for (Map.Entry<GridHadoopJobId, GridHadoopJobMetadata> entry : updated) { @@ -601,7 +602,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { try { ctx.taskExecutor().onJobStateChanged(meta); } - catch (GridException e) { + catch (IgniteCheckedException e) { U.error(log, "Failed to process job state changed callback (will fail the job) " + "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e); @@ -643,10 +644,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param jobId Job ID. * @param meta Job metadata. * @param locNodeId Local node ID. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ private void processJobMetaUpdate(GridHadoopJobId jobId, GridHadoopJobMetadata meta, UUID locNodeId) - throws GridException { + throws IgniteCheckedException { JobLocalState state = activeJobs.get(jobId); GridHadoopJob job = job(jobId, meta.jobInfo()); @@ -920,9 +921,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param jobId Job ID. * @param jobInfo Job info. * @return Job. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - @Nullable public GridHadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws GridException { + @Nullable public GridHadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws IgniteCheckedException { GridFutureAdapterEx<GridHadoopJob> fut = jobs.get(jobId); if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<GridHadoopJob>())) != null) @@ -937,7 +938,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { GridHadoopJobMetadata meta = jobMetaCache().get(jobId); if (meta == null) - throw new GridException("Failed to find job metadata for ID: " + jobId); + throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId); jobInfo = meta.jobInfo(); } @@ -950,7 +951,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { return job; } - catch (GridException e) { + catch (IgniteCheckedException e) { fut.onDone(e); jobs.remove(jobId, fut); @@ -959,7 +960,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { try { job.dispose(false); } - catch (GridException e0) { + catch (IgniteCheckedException e0) { U.error(log, "Failed to dispose job: " + jobId, e0); } } @@ -973,9 +974,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * * @param jobId Job ID. * @return {@code True} if job was killed. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - public boolean killJob(GridHadoopJobId jobId) throws GridException { + public boolean killJob(GridHadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return false; // Grid is stopping. @@ -1012,9 +1013,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * * @param jobId Job identifier. * @return Job counters or {@code null} if job cannot be found. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - @Nullable public GridHadoopCounters jobCounters(GridHadoopJobId jobId) throws GridException { + @Nullable public GridHadoopCounters jobCounters(GridHadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return null; @@ -1149,7 +1150,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { try { f.get(); } - catch (GridException e) { + catch (IgniteCheckedException e) { err = e; } } @@ -1199,7 +1200,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { try { f.get(); } - catch (GridException e) { + catch (IgniteCheckedException e) { err = e; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java index 2d9358c..2db01ac 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java @@ -41,7 +41,7 @@ public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePla /** {@inheritDoc} */ @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top, - @Nullable GridHadoopMapReducePlan oldPlan) throws GridException { + @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException { // Convert collection of topology nodes to collection of topology node IDs. Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f); @@ -53,7 +53,7 @@ public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePla int rdcCnt = job.info().reducers(); if (rdcCnt < 0) - throw new GridException("Number of reducers must be non-negative, actual: " + rdcCnt); + throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt); Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt); @@ -67,10 +67,10 @@ public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePla * @param topIds Topology node IDs. * @param splits Splits. * @return Mappers map. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ private Map<UUID, Collection<GridHadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds, - Iterable<GridHadoopInputSplit> splits) throws GridException { + Iterable<GridHadoopInputSplit> splits) throws IgniteCheckedException { Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>(); Map<String, Collection<UUID>> nodes = hosts(top); @@ -141,7 +141,7 @@ public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePla */ @SuppressWarnings("unchecked") private UUID nodeForSplit(GridHadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, - Map<UUID, Integer> nodeLoads) throws GridException { + Map<UUID, Integer> nodeLoads) throws IgniteCheckedException { if (split instanceof GridHadoopFileBlock) { GridHadoopFileBlock split0 = (GridHadoopFileBlock)split; @@ -278,10 +278,10 @@ public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePla * @param mappers Mappers map. * @param reducerCnt Reducers count. * @return Reducers map. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ private Map<UUID, int[]> reducers(Collection<ClusterNode> top, - Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) throws GridException { + Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) throws IgniteCheckedException { // Determine initial node weights. int totalWeight = 0; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java index 918f776..f8ee45b 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop.proto; +import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; @@ -24,7 +25,7 @@ public class GridHadoopProtocolJobCountersTask extends GridHadoopProtocolTaskAda /** {@inheritDoc} */ @Override public GridHadoopCounters run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws GridException { + GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java index 327e358..497729a 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop.proto; +import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; @@ -32,7 +33,7 @@ public class GridHadoopProtocolJobStatusTask extends GridHadoopProtocolTaskAdapt /** {@inheritDoc} */ @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws GridException { + GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); Long pollDelay = args.get(2); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java index 6b0a9a9..55053b5 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop.proto; +import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; @@ -24,7 +25,7 @@ public class GridHadoopProtocolKillJobTask extends GridHadoopProtocolTaskAdapter /** {@inheritDoc} */ @Override public Boolean run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws GridException { + GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java index d41d355..bade802 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop.proto; +import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; @@ -22,7 +23,7 @@ public class GridHadoopProtocolNextTaskIdTask extends GridHadoopProtocolTaskAdap /** {@inheritDoc} */ @Override public GridHadoopJobId run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws GridException { + GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { return hadoop.nextJobId(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java index 598cb7d..434372c 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop.proto; +import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; @@ -26,7 +27,7 @@ public class GridHadoopProtocolSubmitJobTask extends GridHadoopProtocolTaskAdapt /** {@inheritDoc} */ @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws GridException { + GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); GridHadoopDefaultJobInfo info = args.get(2); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java index 65e14e0..f660c0c 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java @@ -26,18 +26,18 @@ import java.util.*; public abstract class GridHadoopProtocolTaskAdapter<R> implements ComputeTask<GridHadoopProtocolTaskArguments, R> { /** {@inheritDoc} */ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable GridHadoopProtocolTaskArguments arg) throws GridException { + @Nullable GridHadoopProtocolTaskArguments arg) throws IgniteCheckedException { return Collections.singletonMap(new Job(arg), subgrid.get(0)); } /** {@inheritDoc} */ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) - throws GridException { + throws IgniteCheckedException { return ComputeJobResultPolicy.REDUCE; } /** {@inheritDoc} */ - @Nullable @Override public R reduce(List<ComputeJobResult> results) throws GridException { + @Nullable @Override public R reduce(List<ComputeJobResult> results) throws IgniteCheckedException { if (!F.isEmpty(results)) { ComputeJobResult res = results.get(0); @@ -79,7 +79,7 @@ public abstract class GridHadoopProtocolTaskAdapter<R> implements ComputeTask<Gr } /** {@inheritDoc} */ - @Nullable @Override public Object execute() throws GridException { + @Nullable @Override public Object execute() throws IgniteCheckedException { return run(jobCtx, ignite.hadoop(), args); } } @@ -91,8 +91,8 @@ public abstract class GridHadoopProtocolTaskAdapter<R> implements ComputeTask<Gr * @param hadoop Hadoop facade. * @param args Arguments. * @return Job result. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ public abstract R run(ComputeJobContext jobCtx, GridHadoop hadoop, GridHadoopProtocolTaskArguments args) - throws GridException; + throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffle.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffle.java index 795d5a3..55e54ad 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffle.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop.shuffle; +import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; @@ -36,7 +37,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { protected final GridUnsafeMemory mem = new GridUnsafeMemory(0); /** {@inheritDoc} */ - @Override public void start(GridHadoopContext ctx) throws GridException { + @Override public void start(GridHadoopContext ctx) throws IgniteCheckedException { super.start(ctx); ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, @@ -57,7 +58,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { try { job.close(); } - catch (GridException e) { + catch (IgniteCheckedException e) { U.error(log, "Failed to close job.", e); } } @@ -70,9 +71,9 @@ public class GridHadoopShuffle extends GridHadoopComponent { * * @param jobId Job ID. * @return Created shuffle job. - * @throws GridException If job creation failed. + * @throws IgniteCheckedException If job creation failed. */ - private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws GridException { + private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException { GridHadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); GridHadoopShuffleJob<UUID> job = new GridHadoopShuffleJob<>(ctx.localNodeId(), log, @@ -98,9 +99,9 @@ public class GridHadoopShuffle extends GridHadoopComponent { /** * @param nodeId Node ID to send message to. * @param msg Message to send. - * @throws GridException If send failed. + * @throws IgniteCheckedException If send failed. */ - private void send0(UUID nodeId, Object msg) throws GridException { + private void send0(UUID nodeId, Object msg) throws IgniteCheckedException { ClusterNode node = ctx.kernalContext().discovery().node(nodeId); ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); @@ -110,7 +111,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { * @param jobId Task info. * @return Shuffle job. */ - private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws GridException { + private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException { GridHadoopShuffleJob<UUID> res = jobs.get(jobId); if (res == null) { @@ -138,7 +139,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { private void startSending(GridHadoopShuffleJob<UUID> shuffleJob) { shuffleJob.startSending(ctx.kernalContext().gridName(), new IgniteInClosure2X<UUID, GridHadoopShuffleMessage>() { - @Override public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws GridException { + @Override public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws IgniteCheckedException { send0(dest, msg); } } @@ -159,7 +160,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { try { job(m.jobId()).onShuffleMessage(m); } - catch (GridException e) { + catch (IgniteCheckedException e) { U.error(log, "Message handling failed.", e); } @@ -167,7 +168,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { // Reply with ack. send0(src, new GridHadoopShuffleAck(m.id(), m.jobId())); } - catch (GridException e) { + catch (IgniteCheckedException e) { U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e); } } @@ -177,7 +178,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { try { job(m.jobId()).onShuffleAck(m); } - catch (GridException e) { + catch (IgniteCheckedException e) { U.error(log, "Message handling failed.", e); } } @@ -192,7 +193,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { * @param taskCtx Task info. * @return Output. */ - public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws GridException { + public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { return job(taskCtx.taskInfo().jobId()).output(taskCtx); } @@ -200,7 +201,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { * @param taskCtx Task info. * @return Input. */ - public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws GridException { + public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { return job(taskCtx.taskInfo().jobId()).input(taskCtx); } @@ -214,7 +215,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { try { job.close(); } - catch (GridException e) { + catch (IgniteCheckedException e) { U.error(log, "Failed to close job: " + jobId, e); } } @@ -235,7 +236,7 @@ public class GridHadoopShuffle extends GridHadoopComponent { try { return job.flush(); } - catch (GridException e) { + catch (IgniteCheckedException e) { return new GridFinishedFutureEx<>(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleJob.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleJob.java index e0af2fa..1a8614f 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleJob.java @@ -88,10 +88,10 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { * @param mem Memory. * @param totalReducerCnt Amount of reducers in the Job. * @param locReducers Reducers will work on current node. - * @throws GridException If error. + * @throws IgniteCheckedException If error. */ public GridHadoopShuffleJob(T locReduceAddr, IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, - int totalReducerCnt, int[] locReducers) throws GridException { + int totalReducerCnt, int[] locReducers) throws IgniteCheckedException { this.locReduceAddr = locReduceAddr; this.job = job; this.mem = mem; @@ -153,7 +153,7 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { collectUpdatesAndSend(false); } } - catch (GridException e) { + catch (IgniteCheckedException e) { throw new IllegalStateException(e); } } @@ -190,9 +190,9 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { /** * @param msg Message. - * @throws GridException Exception. + * @throws IgniteCheckedException Exception. */ - public void onShuffleMessage(GridHadoopShuffleMessage msg) throws GridException { + public void onShuffleMessage(GridHadoopShuffleMessage msg) throws IgniteCheckedException { assert msg.buffer() != null; assert msg.offset() > 0; @@ -213,7 +213,7 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { /** */ private GridHadoopMultimap.Key key; - @Override public void onKey(byte[] buf, int off, int len) throws GridException { + @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException { dataInput.bytes(buf, off, off + len); key = adder.addKey(dataInput, key); @@ -278,7 +278,7 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { /** * Sends map updates to remote reducers. */ - private void collectUpdatesAndSend(boolean flush) throws GridException { + private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException { for (int i = 0; i < maps.length(); i++) { GridHadoopMultimap map = maps.get(i); @@ -384,7 +384,7 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { // Otherwise flush() should fail. sentMsgs.remove(msgId); } - catch (GridException e) { + catch (IgniteCheckedException e) { log.error("Failed to send message.", e); } } @@ -395,7 +395,7 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { } /** {@inheritDoc} */ - @Override public void close() throws GridException { + @Override public void close() throws IgniteCheckedException { if (snd != null) { snd.cancel(); @@ -426,7 +426,7 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { * @return Future. */ @SuppressWarnings("unchecked") - public IgniteFuture<?> flush() throws GridException { + public IgniteFuture<?> flush() throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Flushing job " + job.id() + " on address " + locReduceAddr); @@ -477,9 +477,9 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { /** * @param taskCtx Task context. * @return Output. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ - public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws GridException { + public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { switch (taskCtx.taskInfo().type()) { case MAP: assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined."; @@ -495,10 +495,10 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { /** * @param taskCtx Task context. * @return Input. - * @throws GridException If failed. + * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws GridException { + public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { switch (taskCtx.taskInfo().type()) { case REDUCE: int reducer = taskCtx.taskInfo().taskNumber(); @@ -548,7 +548,7 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { * Constructor. * @param taskCtx Task context. */ - private PartitionedOutput(GridHadoopTaskContext taskCtx) throws GridException { + private PartitionedOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { this.taskCtx = taskCtx; if (needPartitioner) @@ -556,14 +556,14 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { } /** {@inheritDoc} */ - @Override public void write(Object key, Object val) throws GridException { + @Override public void write(Object key, Object val) throws IgniteCheckedException { int part = 0; if (partitioner != null) { part = partitioner.partition(key, val, adders.length); if (part < 0 || part >= adders.length) - throw new GridException("Invalid partition: " + part); + throw new IgniteCheckedException("Invalid partition: " + part); } GridHadoopTaskOutput out = adders[part]; @@ -575,7 +575,7 @@ public class GridHadoopShuffleJob<T> implements AutoCloseable { } /** {@inheritDoc} */ - @Override public void close() throws GridException { + @Override public void close() throws IgniteCheckedException { for (GridHadoopTaskOutput adder : adders) { if (adder != null) adder.close(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06931b4b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java index 0034963..47d96f9 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.hadoop.shuffle; +import org.apache.ignite.*; import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; import org.gridgain.grid.kernal.processors.hadoop.message.*; @@ -168,7 +169,7 @@ public class GridHadoopShuffleMessage implements GridHadoopMessage { /** * @param v Visitor. */ - public void visit(Visitor v) throws GridException { + public void visit(Visitor v) throws IgniteCheckedException { for (int i = 0; i < off;) { byte marker = buf[i++]; @@ -221,13 +222,13 @@ public class GridHadoopShuffleMessage implements GridHadoopMessage { * @param off Offset. * @param len Length. */ - public void onKey(byte[] buf, int off, int len) throws GridException; + public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException; /** * @param buf Buffer. * @param off Offset. * @param len Length. */ - public void onValue(byte[] buf, int off, int len) throws GridException; + public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException; } }