http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java deleted file mode 100644 index 9aaab4c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * IGFS Hadoop stream descriptor. - */ -public class IgfsHadoopStreamDelegate { - /** RPC handler. */ - private final IgfsHadoopEx hadoop; - - /** Target. */ - private final Object target; - - /** Optional stream length. */ - private final long len; - - /** - * Constructor. - * - * @param target Target. - */ - public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target) { - this(hadoop, target, -1); - } - - /** - * Constructor. - * - * @param target Target. - * @param len Optional length. - */ - public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target, long len) { - assert hadoop != null; - assert target != null; - - this.hadoop = hadoop; - this.target = target; - this.len = len; - } - - /** - * @return RPC handler. - */ - public IgfsHadoopEx hadoop() { - return hadoop; - } - - /** - * @return Stream target. - */ - @SuppressWarnings("unchecked") - public <T> T target() { - return (T) target; - } - - /** - * @return Length. - */ - public long length() { - return len; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return System.identityHashCode(target); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj != null && obj instanceof IgfsHadoopStreamDelegate && - target == ((IgfsHadoopStreamDelegate)obj).target; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsHadoopStreamDelegate.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java deleted file mode 100644 index 20d7f2a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; - -/** - * IGFS input stream event listener. - */ -public interface IgfsHadoopStreamEventListener { - /** - * Callback invoked when the stream is being closed. - * - * @throws IgniteCheckedException If failed. - */ - public void onClose() throws IgniteCheckedException; - - /** - * Callback invoked when remote error occurs. - * - * @param errMsg Error message. - */ - public void onError(String errMsg); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java deleted file mode 100644 index bd96e60..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Utility constants and methods for IGFS Hadoop file system. - */ -public class IgfsHadoopUtils { - /** Parameter name for endpoint no embed mode flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed"; - - /** Parameter name for endpoint no shared memory flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem"; - - /** Parameter name for endpoint no local TCP flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp"; - - /** - * Get string parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return String value. - */ - public static String parameter(Configuration cfg, String name, String authority, String dflt) { - return cfg.get(String.format(name, authority != null ? authority : ""), dflt); - } - - /** - * Get integer parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return Integer value. - * @throws IOException In case of parse exception. - */ - public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException { - String name0 = String.format(name, authority != null ? authority : ""); - - try { - return cfg.getInt(name0, dflt); - } - catch (NumberFormatException ignore) { - throw new IOException("Failed to parse parameter value to integer: " + name0); - } - } - - /** - * Get boolean parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return Boolean value. - */ - public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) { - return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt); - } - - /** - * Cast Ignite exception to appropriate IO exception. - * - * @param e Exception to cast. - * @return Casted exception. - */ - public static IOException cast(IgniteCheckedException e) { - return cast(e, null); - } - - /** - * Cast Ignite exception to appropriate IO exception. - * - * @param e Exception to cast. - * @param path Path for exceptions. - * @return Casted exception. - */ - @SuppressWarnings("unchecked") - public static IOException cast(IgniteCheckedException e, @Nullable String path) { - assert e != null; - - // First check for any nested IOException; if exists - re-throw it. - if (e.hasCause(IOException.class)) - return e.getCause(IOException.class); - else if (e.hasCause(IgfsFileNotFoundException.class)) - return new FileNotFoundException(path); // TODO: Or PathNotFoundException? - else if (e.hasCause(IgfsParentNotDirectoryException.class)) - return new ParentNotDirectoryException(path); - else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class)) - return new PathIsNotEmptyDirectoryException(path); - else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class)) - return new PathExistsException(path); - else - return new IOException(e); - } - - /** - * Constructor. - */ - private IgfsHadoopUtils() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java deleted file mode 100644 index 5586e72..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java +++ /dev/null @@ -1,511 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.hadoop.conf.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopEndpoint.*; -import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; - -/** - * Wrapper for IGFS server. - */ -public class IgfsHadoopWrapper implements IgfsHadoop { - /** Delegate. */ - private final AtomicReference<Delegate> delegateRef = new AtomicReference<>(); - - /** Authority. */ - private final String authority; - - /** Connection string. */ - private final IgfsHadoopEndpoint endpoint; - - /** Log directory. */ - private final String logDir; - - /** Configuration. */ - private final Configuration conf; - - /** Logger. */ - private final Log log; - - /** - * Constructor. - * - * @param authority Authority (connection string). - * @param logDir Log directory for server. - * @param conf Configuration. - * @param log Current logger. - */ - public IgfsHadoopWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException { - try { - this.authority = authority; - this.endpoint = new IgfsHadoopEndpoint(authority); - this.logDir = logDir; - this.conf = conf; - this.log = log; - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to parse endpoint: " + authority, e); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() { - @Override public IgfsHandshakeResponse apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) { - return hndResp; - } - }); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - Delegate delegate = delegateRef.get(); - - if (delegate != null && delegateRef.compareAndSet(delegate, null)) - delegate.close(force); - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsFile>() { - @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.info(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsFile>() { - @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.update(path, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) - throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.setTimes(path, accessTime, modificationTime); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.rename(src, dest); - } - }, src); - } - - /** {@inheritDoc} */ - @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.delete(path, recursive); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, - final long len) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() { - @Override public Collection<IgfsBlockLocation> apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.affinity(path, start, len); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() { - @Override public IgfsPathSummary apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.contentSummary(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.mkdirs(path, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() { - @Override public Collection<IgfsFile> apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.listFiles(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() { - @Override public Collection<IgfsPath> apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.listPaths(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsStatus>() { - @Override public IgfsStatus apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.fsStatus(); - } - }); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { - @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.open(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) - throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { - @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.open(path, seqReadsBeforePrefetch); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate create(final IgfsPath path, final boolean overwrite, - final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props) - throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { - @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.create(path, overwrite, colocate, replication, blockSize, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate append(final IgfsPath path, final boolean create, - @Nullable final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { - @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.append(path, create, props); - } - }, path); - } - - /** - * Execute closure which is not path-specific. - * - * @param clo Closure. - * @return Result. - * @throws IOException If failed. - */ - private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException { - return withReconnectHandling(clo, null); - } - - /** - * Execute closure. - * - * @param clo Closure. - * @param path Path for exceptions. - * @return Result. - * @throws IOException If failed. - */ - private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path) - throws IOException { - Exception err = null; - - for (int i = 0; i < 2; i++) { - Delegate curDelegate = null; - - boolean close = false; - boolean force = false; - - try { - curDelegate = delegate(); - - assert curDelegate != null; - - close = curDelegate.doomed; - - return clo.apply(curDelegate.hadoop, curDelegate.hndResp); - } - catch (IgfsHadoopCommunicationException e) { - if (curDelegate != null && !curDelegate.doomed) { - // Try getting rid fo faulty delegate ASAP. - delegateRef.compareAndSet(curDelegate, null); - - close = true; - force = true; - } - - if (log.isDebugEnabled()) - log.debug("Failed to send message to a server: " + e); - - err = e; - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e, path != null ? path.toString() : null); - } - finally { - if (close) { - assert curDelegate != null; - - curDelegate.close(force); - } - } - } - - throw new IOException("Failed to communicate with IGFS.", err); - } - - /** - * Get delegate creating it if needed. - * - * @return Delegate. - */ - private Delegate delegate() throws IgfsHadoopCommunicationException { - Exception err = null; - - // 1. If delegate is set, return it immediately. - Delegate curDelegate = delegateRef.get(); - - if (curDelegate != null) - return curDelegate; - - // 2. Guess that we are in the same VM. - if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false)) { - IgfsEx igfs = null; - - if (endpoint.grid() == null) { - try { - Ignite ignite = G.ignite(); - - igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs()); - } - catch (Exception e) { - err = e; - } - } - else { - for (Ignite ignite : G.allGrids()) { - try { - igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs()); - - break; - } - catch (Exception e) { - err = e; - } - } - } - - if (igfs != null) { - IgfsHadoopEx hadoop = null; - - try { - hadoop = new IgfsHadoopInProc(igfs, log); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof IgfsHadoopCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e); - - err = e; - } - } - } - - // 3. Try connecting using shmem. - if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) { - if (curDelegate == null && !U.isWindows()) { - IgfsHadoopEx hadoop = null; - - try { - hadoop = new IgfsHadoopOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof IgfsHadoopCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to out-proc local IGFS using shmem.", e); - - err = e; - } - } - } - - // 4. Try local TCP connection. - boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); - - if (!skipLocTcp) { - if (curDelegate == null) { - IgfsHadoopEx hadoop = null; - - try { - hadoop = new IgfsHadoopOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), - log); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof IgfsHadoopCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to out-proc local IGFS using TCP.", e); - - err = e; - } - } - } - - // 5. Try remote TCP connection. - if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { - IgfsHadoopEx hadoop = null; - - try { - hadoop = new IgfsHadoopOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof IgfsHadoopCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to out-proc remote IGFS using TCP.", e); - - err = e; - } - } - - if (curDelegate != null) { - if (!delegateRef.compareAndSet(null, curDelegate)) - curDelegate.doomed = true; - - return curDelegate; - } - else - throw new IgfsHadoopCommunicationException("Failed to connect to IGFS: " + endpoint, err); - } - - /** - * File system operation closure. - */ - private static interface FileSystemClosure<T> { - /** - * Call closure body. - * - * @param hadoop RPC handler. - * @param hndResp Handshake response. - * @return Result. - * @throws IgniteCheckedException If failed. - * @throws IOException If failed. - */ - public T apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; - } - - /** - * Delegate. - */ - private static class Delegate { - /** RPC handler. */ - private final IgfsHadoopEx hadoop; - - /** Handshake request. */ - private final IgfsHandshakeResponse hndResp; - - /** Close guard. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - /** Whether this delegate must be closed at the end of the next invocation. */ - private boolean doomed; - - /** - * Constructor. - * - * @param hadoop Hadoop. - * @param hndResp Handshake response. - */ - private Delegate(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) { - this.hadoop = hadoop; - this.hndResp = hndResp; - } - - /** - * Close underlying RPC handler. - * - * @param force Force flag. - */ - private void close(boolean force) { - if (closeGuard.compareAndSet(false, true)) - hadoop.close(force); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html deleted file mode 100644 index ec380f2..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains IGFS client classes. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html deleted file mode 100644 index 4b070d3..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains IGFS client and common classes. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java deleted file mode 100644 index bc4c0bb..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; -import org.objectweb.asm.*; -import org.objectweb.asm.commons.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Class loader allowing explicitly load classes without delegation to parent class loader. - * Also supports class parsing for finding dependencies which contain transitive dependencies - * unavailable for parent. - */ -public class GridHadoopClassLoader extends URLClassLoader { - /** - * We are very parallel capable. - */ - static { - registerAsParallelCapable(); - } - - /** */ - private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)GridHadoopClassLoader.class.getClassLoader(); - - /** */ - private static final Collection<URL> appJars = F.asList(APP_CLS_LDR.getURLs()); - - /** */ - private static volatile Collection<URL> hadoopJars; - - /** */ - private static final Map<String, Boolean> cache = new ConcurrentHashMap8<>(); - - /** */ - private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>(); - - /** - * @param urls Urls. - */ - public GridHadoopClassLoader(URL[] urls) { - super(addHadoopUrls(urls), APP_CLS_LDR); - - assert !(getParent() instanceof GridHadoopClassLoader); - } - - /** - * Need to parse only Ignite Hadoop and IGFS classes. - * - * @param cls Class name. - * @return {@code true} if we need to check this class. - */ - private static boolean isIgfsHadoop(String cls) { - String ignitePackagePrefix = "org.apache.ignite"; - int len = ignitePackagePrefix.length(); - - return cls.startsWith(ignitePackagePrefix) && (cls.indexOf("igfs.", len) != -1 || cls.indexOf(".fs.", len) != -1 || cls.indexOf("hadoop.", len) != -1); - } - - /** - * @param cls Class name. - * @return {@code true} If this is Hadoop class. - */ - private static boolean isHadoop(String cls) { - return cls.startsWith("org.apache.hadoop."); - } - - /** {@inheritDoc} */ - @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - try { - if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath. - if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks. - return loadFromBytes(name, GridHadoopShutdownHookManager.class.getName()); - else if (name.endsWith(".util.NativeCodeLoader")) - return loadFromBytes(name, GridHadoopNativeCodeLoader.class.getName()); - - return loadClassExplicitly(name, resolve); - } - - if (isIgfsHadoop(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop. - Boolean hasDeps = cache.get(name); - - if (hasDeps == null) { - hasDeps = hasExternalDependencies(name, new HashSet<String>()); - - cache.put(name, hasDeps); - } - - if (hasDeps) - return loadClassExplicitly(name, resolve); - } - - return super.loadClass(name, resolve); - } - catch (NoClassDefFoundError | ClassNotFoundException e) { - throw new ClassNotFoundException("Failed to load class: " + name, e); - } - } - - /** - * @param name Name. - * @param replace Replacement. - * @return Class. - */ - private Class<?> loadFromBytes(final String name, final String replace) { - synchronized (getClassLoadingLock(name)) { - // First, check if the class has already been loaded - Class c = findLoadedClass(name); - - if (c != null) - return c; - - byte[] bytes = bytesCache.get(name); - - if (bytes == null) { - InputStream in = loadClassBytes(getParent(), replace); - - ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException(e); - } - - ClassWriter w = new ClassWriter(Opcodes.ASM4); - - rdr.accept(new RemappingClassAdapter(w, new Remapper() { - /** */ - String replaceType = replace.replace('.', '/'); - - /** */ - String nameType = name.replace('.', '/'); - - @Override public String map(String type) { - if (type.equals(replaceType)) - return nameType; - - return type; - } - }), ClassReader.EXPAND_FRAMES); - - bytes = w.toByteArray(); - - bytesCache.put(name, bytes); - } - - return defineClass(name, bytes, 0, bytes.length); - } - } - - /** - * @param name Class name. - * @param resolve Resolve class. - * @return Class. - * @throws ClassNotFoundException If failed. - */ - private Class<?> loadClassExplicitly(String name, boolean resolve) throws ClassNotFoundException { - synchronized (getClassLoadingLock(name)) { - // First, check if the class has already been loaded - Class c = findLoadedClass(name); - - if (c == null) { - long t1 = System.nanoTime(); - - c = findClass(name); - - // this is the defining class loader; record the stats - sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1); - sun.misc.PerfCounter.getFindClasses().increment(); - } - - if (resolve) - resolveClass(c); - - return c; - } - } - - /** - * @param ldr Loader. - * @param clsName Class. - * @return Input stream. - */ - @Nullable private InputStream loadClassBytes(ClassLoader ldr, String clsName) { - return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class"); - } - - /** - * @param clsName Class name. - * @return {@code true} If the class has external dependencies. - */ - boolean hasExternalDependencies(final String clsName, final Set<String> visited) { - if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external. - return true; - - // Try to get from parent to check if the type accessible. - InputStream in = loadClassBytes(getParent(), clsName); - - if (in == null) // The class is external itself, it must be loaded from this class loader. - return true; - - if (!isIgfsHadoop(clsName)) // Other classes should not have external dependencies. - return false; - - final ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException("Failed to read class: " + clsName, e); - } - - visited.add(clsName); - - final AtomicBoolean hasDeps = new AtomicBoolean(); - - rdr.accept(new ClassVisitor(Opcodes.ASM4) { - AnnotationVisitor av = new AnnotationVisitor(Opcodes.ASM4) { - // TODO - }; - - FieldVisitor fv = new FieldVisitor(Opcodes.ASM4) { - @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) { - onType(desc); - - return av; - } - }; - - MethodVisitor mv = new MethodVisitor(Opcodes.ASM4) { - @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) { - onType(desc); - - return av; - } - - @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) { - onType(desc); - - return av; - } - - @Override public AnnotationVisitor visitAnnotationDefault() { - return av; - } - - @Override public void visitFieldInsn(int i, String owner, String name, String desc) { - onType(owner); - onType(desc); - } - - @Override public void visitFrame(int i, int i2, Object[] locTypes, int i3, Object[] stackTypes) { - for (Object o : locTypes) { - if (o instanceof String) - onType((String)o); - } - - for (Object o : stackTypes) { - if (o instanceof String) - onType((String)o); - } - } - - @Override public void visitLocalVariable(String name, String desc, String signature, Label lb, - Label lb2, int i) { - onType(desc); - } - - @Override public void visitMethodInsn(int i, String owner, String name, String desc) { - onType(owner); - } - - @Override public void visitMultiANewArrayInsn(String desc, int dim) { - onType(desc); - } - - @Override public void visitTryCatchBlock(Label lb, Label lb2, Label lb3, String e) { - onType(e); - } - }; - - void onClass(String depCls) { - assert validateClassName(depCls) : depCls; - - if (depCls.startsWith("java.")) // Filter out platform classes. - return; - - if (visited.contains(depCls)) - return; - - Boolean res = cache.get(depCls); - - if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, visited))) - hasDeps.set(true); - } - - void onType(String type) { - if (type == null) - return; - - int off = 0; - - while (type.charAt(off) == '[') - off++; // Handle arrays. - - if (off != 0) - type = type.substring(off); - - if (type.length() == 1) - return; // Get rid of primitives. - - if (type.charAt(type.length() - 1) == ';') { - assert type.charAt(0) == 'L' : type; - - type = type.substring(1, type.length() - 1); - } - - type = type.replace('/', '.'); - - onClass(type); - } - - @Override public void visit(int i, int i2, String name, String signature, String superName, - String[] ifaces) { - onType(superName); - - if (ifaces != null) { - for (String iface : ifaces) - onType(iface); - } - } - - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - onType(desc); - - return av; - } - - @Override public void visitInnerClass(String name, String outerName, String innerName, int i) { - onType(name); - } - - @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) { - onType(desc); - - return fv; - } - - @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature, - String[] exceptions) { - if (exceptions != null) { - for (String e : exceptions) - onType(e); - } - - return mv; - } - }, 0); - - if (hasDeps.get()) // We already know that we have dependencies, no need to check parent. - return true; - - // Here we are known to not have any dependencies but possibly we have a parent which have them. - int idx = clsName.lastIndexOf('$'); - - if (idx == -1) // No parent class. - return false; - - String parentCls = clsName.substring(0, idx); - - if (visited.contains(parentCls)) - return false; - - Boolean res = cache.get(parentCls); - - if (res == null) - res = hasExternalDependencies(parentCls, visited); - - return res; - } - - /** - * @param name Class name. - * @return {@code true} If this is a valid class name. - */ - private static boolean validateClassName(String name) { - int len = name.length(); - - if (len <= 1) - return false; - - if (!Character.isJavaIdentifierStart(name.charAt(0))) - return false; - - boolean hasDot = false; - - for (int i = 1; i < len; i++) { - char c = name.charAt(i); - - if (c == '.') - hasDot = true; - else if (!Character.isJavaIdentifierPart(c)) - return false; - } - - return hasDot; - } - - /** - * @param name Variable name. - * @param dflt Default. - * @return Value. - */ - private static String getEnv(String name, String dflt) { - String res = System.getProperty(name); - - if (F.isEmpty(res)) - res = System.getenv(name); - - return F.isEmpty(res) ? dflt : res; - } - - /** - * @param res Result. - * @param dir Directory. - * @param startsWith Starts with prefix. - * @throws MalformedURLException If failed. - */ - private static void addUrls(Collection<URL> res, File dir, final String startsWith) throws Exception { - File[] files = dir.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return startsWith == null || name.startsWith(startsWith); - } - }); - - if (files == null) - throw new IOException("Path is not a directory: " + dir); - - for (File file : files) - res.add(file.toURI().toURL()); - } - - /** - * @param urls URLs. - * @return URLs. - */ - private static URL[] addHadoopUrls(URL[] urls) { - Collection<URL> hadoopJars; - - try { - hadoopJars = hadoopUrls(); - } - catch (IgniteCheckedException e) { - throw new RuntimeException(e); - } - - ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + appJars.size() + (urls == null ? 0 : urls.length)); - - list.addAll(appJars); - list.addAll(hadoopJars); - - if (!F.isEmpty(urls)) - list.addAll(F.asList(urls)); - - return list.toArray(new URL[list.size()]); - } - - /** - * @return HADOOP_HOME Variable. - */ - @Nullable public static String hadoopHome() { - return getEnv("HADOOP_PREFIX", getEnv("HADOOP_HOME", null)); - } - - /** - * @return Collection of jar URLs. - * @throws IgniteCheckedException If failed. - */ - public static Collection<URL> hadoopUrls() throws IgniteCheckedException { - Collection<URL> hadoopUrls = hadoopJars; - - if (hadoopUrls != null) - return hadoopUrls; - - synchronized (GridHadoopClassLoader.class) { - hadoopUrls = hadoopJars; - - if (hadoopUrls != null) - return hadoopUrls; - - hadoopUrls = new ArrayList<>(); - - String hadoopPrefix = hadoopHome(); - - if (F.isEmpty(hadoopPrefix)) - throw new IgniteCheckedException("Failed resolve Hadoop installation location. Either HADOOP_PREFIX or " + - "HADOOP_HOME environment variables must be set."); - - String commonHome = getEnv("HADOOP_COMMON_HOME", hadoopPrefix + "/share/hadoop/common"); - String hdfsHome = getEnv("HADOOP_HDFS_HOME", hadoopPrefix + "/share/hadoop/hdfs"); - String mapredHome = getEnv("HADOOP_MAPRED_HOME", hadoopPrefix + "/share/hadoop/mapreduce"); - - try { - addUrls(hadoopUrls, new File(commonHome + "/lib"), null); - addUrls(hadoopUrls, new File(hdfsHome + "/lib"), null); - addUrls(hadoopUrls, new File(mapredHome + "/lib"), null); - - addUrls(hadoopUrls, new File(hdfsHome), "hadoop-hdfs-"); - - addUrls(hadoopUrls, new File(commonHome), "hadoop-common-"); - addUrls(hadoopUrls, new File(commonHome), "hadoop-auth-"); - addUrls(hadoopUrls, new File(commonHome + "/lib"), "hadoop-auth-"); - - addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-common"); - addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-core"); - } - catch (Exception e) { - throw new IgniteCheckedException(e); - } - - hadoopJars = hadoopUrls; - - return hadoopUrls; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java deleted file mode 100644 index 337bfe9..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -/** - * Abstract class for all hadoop components. - */ -public abstract class GridHadoopComponent { - /** Hadoop context. */ - protected GridHadoopContext ctx; - - /** Logger. */ - protected IgniteLogger log; - - /** - * @param ctx Hadoop context. - */ - public void start(GridHadoopContext ctx) throws IgniteCheckedException { - this.ctx = ctx; - - log = ctx.kernalContext().log(getClass()); - } - - /** - * Stops manager. - */ - public void stop(boolean cancel) { - // No-op. - } - - /** - * Callback invoked when all grid components are started. - */ - public void onKernalStart() throws IgniteCheckedException { - // No-op. - } - - /** - * Callback invoked before all grid components are stopped. - */ - public void onKernalStop(boolean cancel) { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java deleted file mode 100644 index 3160e3d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Hadoop accelerator context. - */ -public class GridHadoopContext { - /** Kernal context. */ - private GridKernalContext ctx; - - /** Hadoop configuration. */ - private GridHadoopConfiguration cfg; - - /** Job tracker. */ - private GridHadoopJobTracker jobTracker; - - /** External task executor. */ - private GridHadoopTaskExecutorAdapter taskExecutor; - - /** */ - private GridHadoopShuffle shuffle; - - /** Managers list. */ - private List<GridHadoopComponent> components = new ArrayList<>(); - - /** - * @param ctx Kernal context. - */ - public GridHadoopContext( - GridKernalContext ctx, - GridHadoopConfiguration cfg, - GridHadoopJobTracker jobTracker, - GridHadoopTaskExecutorAdapter taskExecutor, - GridHadoopShuffle shuffle - ) { - this.ctx = ctx; - this.cfg = cfg; - - this.jobTracker = add(jobTracker); - this.taskExecutor = add(taskExecutor); - this.shuffle = add(shuffle); - } - - /** - * Gets list of managers. - * - * @return List of managers. - */ - public List<GridHadoopComponent> components() { - return components; - } - - /** - * Gets kernal context. - * - * @return Grid kernal context instance. - */ - public GridKernalContext kernalContext() { - return ctx; - } - - /** - * Gets Hadoop configuration. - * - * @return Hadoop configuration. - */ - public GridHadoopConfiguration configuration() { - return cfg; - } - - /** - * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}. - * - * @return Local node ID. - */ - public UUID localNodeId() { - return ctx.localNodeId(); - } - - /** - * Gets local node order. - * - * @return Local node order. - */ - public long localNodeOrder() { - assert ctx.discovery() != null; - - return ctx.discovery().localNode().order(); - } - - /** - * @return Hadoop-enabled nodes. - */ - public Collection<ClusterNode> nodes() { - return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersion()); - } - - /** - * @return {@code True} if - */ - public boolean jobUpdateLeader() { - long minOrder = Long.MAX_VALUE; - ClusterNode minOrderNode = null; - - for (ClusterNode node : nodes()) { - if (node.order() < minOrder) { - minOrder = node.order(); - minOrderNode = node; - } - } - - assert minOrderNode != null; - - return localNodeId().equals(minOrderNode.id()); - } - - /** - * @param meta Job metadata. - * @return {@code true} If local node is participating in job execution. - */ - public boolean isParticipating(GridHadoopJobMetadata meta) { - UUID locNodeId = localNodeId(); - - if (locNodeId.equals(meta.submitNodeId())) - return true; - - GridHadoopMapReducePlan plan = meta.mapReducePlan(); - - return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader(); - } - - /** - * @return Jon tracker instance. - */ - public GridHadoopJobTracker jobTracker() { - return jobTracker; - } - - /** - * @return Task executor. - */ - public GridHadoopTaskExecutorAdapter taskExecutor() { - return taskExecutor; - } - - /** - * @return Shuffle. - */ - public GridHadoopShuffle shuffle() { - return shuffle; - } - - /** - * @return Map-reduce planner. - */ - public GridHadoopMapReducePlanner planner() { - return cfg.getMapReducePlanner(); - } - - /** - * Adds component. - * - * @param c Component to add. - * @return Added manager. - */ - private <C extends GridHadoopComponent> C add(C c) { - components.add(c); - - return c; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java deleted file mode 100644 index 555c573..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.lang.reflect.*; -import java.util.*; - -/** - * Hadoop job info based on default Hadoop configuration. - */ -public class GridHadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable { - /** */ - private static final long serialVersionUID = 5489900236464999951L; - - /** {@code true} If job has combiner. */ - private boolean hasCombiner; - - /** Number of reducers configured for job. */ - private int numReduces; - - /** Configuration. */ - private Map<String,String> props = new HashMap<>(); - - /** Job name. */ - private String jobName; - - /** User name. */ - private String user; - - /** */ - private static volatile Class<?> jobCls; - - /** - * Default constructor required by {@link Externalizable}. - */ - public GridHadoopDefaultJobInfo() { - // No-op. - } - - /** - * Constructor. - * - * @param jobName Job name. - * @param user User name. - * @param hasCombiner {@code true} If job has combiner. - * @param numReduces Number of reducers configured for job. - * @param props All other properties of the job. - */ - public GridHadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces, - Map<String, String> props) { - this.jobName = jobName; - this.user = user; - this.hasCombiner = hasCombiner; - this.numReduces = numReduces; - this.props = props; - } - - /** {@inheritDoc} */ - @Nullable @Override public String property(String name) { - return props.get(name); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { - try { - Class<?> jobCls0 = jobCls; - - if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. - synchronized (GridHadoopDefaultJobInfo.class) { - if ((jobCls0 = jobCls) == null) { - GridHadoopClassLoader ldr = new GridHadoopClassLoader(null); - - jobCls = jobCls0 = ldr.loadClass(GridHadoopV2Job.class.getName()); - } - } - } - - Constructor<?> constructor = jobCls0.getConstructor(GridHadoopJobId.class, GridHadoopDefaultJobInfo.class, - IgniteLogger.class); - - return (GridHadoopJob)constructor.newInstance(jobId, this, log); - } - // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call. - catch (Throwable t) { - throw new IgniteCheckedException(t); - } - } - - /** {@inheritDoc} */ - @Override public boolean hasCombiner() { - return hasCombiner; - } - - /** {@inheritDoc} */ - @Override public boolean hasReducer() { - return reducers() > 0; - } - - /** {@inheritDoc} */ - @Override public int reducers() { - return numReduces; - } - - /** {@inheritDoc} */ - @Override public String jobName() { - return jobName; - } - - /** {@inheritDoc} */ - @Override public String user() { - return user; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, jobName); - U.writeString(out, user); - - out.writeBoolean(hasCombiner); - out.writeInt(numReduces); - - U.writeStringMap(out, props); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobName = U.readString(in); - user = U.readString(in); - - hasCombiner = in.readBoolean(); - numReduces = in.readInt(); - - props = U.readStringMap(in); - } - - /** - * @return Properties of the job. - */ - public Map<String, String> properties() { - return props; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java deleted file mode 100644 index 55e3690..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.jetbrains.annotations.*; - -/** - * Hadoop facade implementation. - */ -public class GridHadoopImpl implements GridHadoop { - /** Hadoop processor. */ - private final IgniteHadoopProcessor proc; - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** - * Constructor. - * - * @param proc Hadoop processor. - */ - GridHadoopImpl(IgniteHadoopProcessor proc) { - this.proc = proc; - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration configuration() { - return proc.config(); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobId nextJobId() { - if (busyLock.enterBusy()) { - try { - return proc.nextJobId(); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get next job ID (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { - if (busyLock.enterBusy()) { - try { - return proc.submit(jobId, jobInfo); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to submit job (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.status(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job status (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.counters(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job counters (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.finishFuture(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job finish future (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.kill(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to kill job (grid is stopping)."); - } -}