http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java new file mode 100644 index 0000000..c9c61fe --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +/** + * IO layer implementation based on blocking IPC streams. + */ +@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") +public class HadoopIgfsIpcIo implements HadoopIgfsIo { + /** Logger. */ + private Log log; + + /** Request futures map. */ + private ConcurrentMap<Long, HadoopIgfsFuture> reqMap = + new ConcurrentHashMap8<>(); + + /** Request ID counter. */ + private AtomicLong reqIdCnt = new AtomicLong(); + + /** Endpoint. */ + private IpcEndpoint endpoint; + + /** Endpoint output stream. */ + private IgfsDataOutputStream out; + + /** Protocol. */ + private final IgfsMarshaller marsh; + + /** Client reader thread. */ + private Thread reader; + + /** Lock for graceful shutdown. */ + private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); + + /** Stopping flag. */ + private volatile boolean stopping; + + /** Server endpoint address. */ + private final String endpointAddr; + + /** Number of open file system sessions. */ + private final AtomicInteger activeCnt = new AtomicInteger(1); + + /** Event listeners. */ + private final Collection<HadoopIgfsIpcIoListener> lsnrs = + new GridConcurrentHashSet<>(); + + /** Cached connections. */ + private static final ConcurrentMap<String, HadoopIgfsIpcIo> ipcCache = + new ConcurrentHashMap8<>(); + + /** Striped lock that prevents multiple instance creation in {@link #get(Log, String)}. */ + private static final GridStripedLock initLock = new GridStripedLock(32); + + /** + * @param endpointAddr Endpoint. + * @param marsh Protocol. + * @param log Logger to use. + */ + public HadoopIgfsIpcIo(String endpointAddr, IgfsMarshaller marsh, Log log) { + assert endpointAddr != null; + assert marsh != null; + + this.endpointAddr = endpointAddr; + this.marsh = marsh; + this.log = log; + } + + /** + * Returns a started and valid instance of this class + * for a given endpoint. + * + * @param log Logger to use for new instance. + * @param endpoint Endpoint string. + * @return New or existing cached instance, which is started and operational. + * @throws IOException If new instance was created but failed to start. + */ + public static HadoopIgfsIpcIo get(Log log, String endpoint) throws IOException { + while (true) { + HadoopIgfsIpcIo clientIo = ipcCache.get(endpoint); + + if (clientIo != null) { + if (clientIo.acquire()) + return clientIo; + else + // If concurrent close. + ipcCache.remove(endpoint, clientIo); + } + else { + Lock lock = initLock.getLock(endpoint); + + lock.lock(); + + try { + clientIo = ipcCache.get(endpoint); + + if (clientIo != null) { // Perform double check. + if (clientIo.acquire()) + return clientIo; + else + // If concurrent close. + ipcCache.remove(endpoint, clientIo); + } + + // Otherwise try creating a new one. + clientIo = new HadoopIgfsIpcIo(endpoint, new IgfsMarshaller(), log); + + try { + clientIo.start(); + } + catch (IgniteCheckedException e) { + throw new IOException(e.getMessage(), e); + } + + HadoopIgfsIpcIo old = ipcCache.putIfAbsent(endpoint, clientIo); + + // Put in exclusive lock. + assert old == null; + + return clientIo; + } + finally { + lock.unlock(); + } + } + } + } + + /** + * Increases usage count for this instance. + * + * @return {@code true} if usage count is greater than zero. + */ + private boolean acquire() { + while (true) { + int cnt = activeCnt.get(); + + if (cnt == 0) { + if (log.isDebugEnabled()) + log.debug("IPC IO not acquired (count was 0): " + this); + + return false; + } + + // Need to make sure that no-one decremented count in between. + if (activeCnt.compareAndSet(cnt, cnt + 1)) { + if (log.isDebugEnabled()) + log.debug("IPC IO acquired: " + this); + + return true; + } + } + } + + /** + * Releases this instance, decrementing usage count. + * <p> + * If usage count becomes zero, the instance is stopped + * and removed from cache. + */ + public void release() { + while (true) { + int cnt = activeCnt.get(); + + if (cnt == 0) { + if (log.isDebugEnabled()) + log.debug("IPC IO not released (count was 0): " + this); + + return; + } + + if (activeCnt.compareAndSet(cnt, cnt - 1)) { + if (cnt == 1) { + ipcCache.remove(endpointAddr, this); + + if (log.isDebugEnabled()) + log.debug("IPC IO stopping as unused: " + this); + + stop(); + } + else if (log.isDebugEnabled()) + log.debug("IPC IO released: " + this); + + return; + } + } + } + + /** + * Closes this IO instance, removing it from cache. + */ + public void forceClose() { + if (ipcCache.remove(endpointAddr, this)) + stop(); + } + + /** + * Starts the IO. + * + * @throws IgniteCheckedException If failed to connect the endpoint. + */ + private void start() throws IgniteCheckedException { + boolean success = false; + + try { + endpoint = IpcEndpointFactory.connectEndpoint( + endpointAddr, new GridLoggerProxy(new HadoopIgfsJclLogger(log), null, null, "")); + + out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); + + reader = new ReaderThread(); + + // Required for Hadoop 2.x + reader.setDaemon(true); + + reader.start(); + + success = true; + } + catch (IgniteCheckedException e) { + IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class); + + if (resEx != null) + throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx); + + throw e; + } + finally { + if (!success) + stop(); + } + } + + /** + * Shuts down the IO. No send requests will be accepted anymore, all pending futures will be failed. + * Close listeners will be invoked as if connection is closed by server. + */ + private void stop() { + close0(null); + + if (reader != null) { + try { + U.interrupt(reader); + U.join(reader); + + reader = null; + } + catch (IgniteInterruptedCheckedException ignored) { + Thread.currentThread().interrupt(); + + log.warn("Got interrupted while waiting for reader thread to shut down (will return)."); + } + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(HadoopIgfsIpcIoListener lsnr) { + if (!busyLock.readLock().tryLock()) { + lsnr.onClose(); + + return; + } + + boolean invokeNow = false; + + try { + invokeNow = stopping; + + if (!invokeNow) + lsnrs.add(lsnr); + } + finally { + busyLock.readLock().unlock(); + + if (invokeNow) + lsnr.onClose(); + } + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(HadoopIgfsIpcIoListener lsnr) { + lsnrs.remove(lsnr); + } + + /** {@inheritDoc} */ + @Override public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException { + return send(msg, null, 0, 0); + } + + /** {@inheritDoc} */ + @Override public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, + int outLen) throws IgniteCheckedException { + assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK; + + if (!busyLock.readLock().tryLock()) + throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " + + "closed)."); + + try { + if (stopping) + throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " + + "closed)."); + + long reqId = reqIdCnt.getAndIncrement(); + + HadoopIgfsFuture<T> fut = new HadoopIgfsFuture<>(); + + fut.outputBuffer(outBuf); + fut.outputOffset(outOff); + fut.outputLength(outLen); + fut.read(msg.command() == IgfsIpcCommand.READ_BLOCK); + + HadoopIgfsFuture oldFut = reqMap.putIfAbsent(reqId, fut); + + assert oldFut == null; + + if (log.isDebugEnabled()) + log.debug("Sending IGFS message [reqId=" + reqId + ", msg=" + msg + ']'); + + byte[] hdr = IgfsMarshaller.createHeader(reqId, msg.command()); + + IgniteCheckedException err = null; + + try { + synchronized (this) { + marsh.marshall(msg, hdr, out); + + out.flush(); // Blocking operation + sometimes system call. + } + } + catch (IgniteCheckedException e) { + err = e; + } + catch (IOException e) { + err = new HadoopIgfsCommunicationException(e); + } + + if (err != null) { + reqMap.remove(reqId, fut); + + fut.onDone(err); + } + + return fut; + } + finally { + busyLock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void sendPlain(IgfsMessage msg) throws IgniteCheckedException { + if (!busyLock.readLock().tryLock()) + throw new HadoopIgfsCommunicationException("Failed to send message (client is being " + + "concurrently closed)."); + + try { + if (stopping) + throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently closed)."); + + assert msg.command() == IgfsIpcCommand.WRITE_BLOCK; + + IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg; + + byte[] hdr = IgfsMarshaller.createHeader(-1, IgfsIpcCommand.WRITE_BLOCK); + + U.longToBytes(req.streamId(), hdr, 12); + U.intToBytes(req.length(), hdr, 20); + + synchronized (this) { + out.write(hdr); + out.write(req.data(), (int)req.position(), req.length()); + + out.flush(); + } + } + catch (IOException e) { + throw new HadoopIgfsCommunicationException(e); + } + finally { + busyLock.readLock().unlock(); + } + } + + /** + * Closes client but does not wait. + * + * @param err Error. + */ + private void close0(@Nullable Throwable err) { + busyLock.writeLock().lock(); + + try { + if (stopping) + return; + + stopping = true; + } + finally { + busyLock.writeLock().unlock(); + } + + if (err == null) + err = new IgniteCheckedException("Failed to perform request (connection was concurrently closed before response " + + "is received)."); + + // Clean up resources. + U.closeQuiet(out); + + if (endpoint != null) + endpoint.close(); + + // Unwind futures. We can safely iterate here because no more futures will be added. + Iterator<HadoopIgfsFuture> it = reqMap.values().iterator(); + + while (it.hasNext()) { + HadoopIgfsFuture fut = it.next(); + + fut.onDone(err); + + it.remove(); + } + + for (HadoopIgfsIpcIoListener lsnr : lsnrs) + lsnr.onClose(); + } + + /** + * Do not extend {@code GridThread} to minimize class dependencies. + */ + private class ReaderThread extends Thread { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run() { + // Error to fail pending futures. + Throwable err = null; + + try { + InputStream in = endpoint.inputStream(); + + IgfsDataInputStream dis = new IgfsDataInputStream(in); + + byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE]; + byte[] msgHdr = new byte[IgfsControlResponse.RES_HEADER_SIZE]; + + while (!Thread.currentThread().isInterrupted()) { + dis.readFully(hdr); + + long reqId = U.bytesToLong(hdr, 0); + + // We don't wait for write responses, therefore reqId is -1. + if (reqId == -1) { + // We received a response which normally should not be sent. It must contain an error. + dis.readFully(msgHdr); + + assert msgHdr[4] != 0; + + String errMsg = dis.readUTF(); + + // Error code. + dis.readInt(); + + long streamId = dis.readLong(); + + for (HadoopIgfsIpcIoListener lsnr : lsnrs) + lsnr.onError(streamId, errMsg); + } + else { + HadoopIgfsFuture<Object> fut = reqMap.remove(reqId); + + if (fut == null) { + String msg = "Failed to read response from server: response closure is unavailable for " + + "requestId (will close connection):" + reqId; + + log.warn(msg); + + err = new IgniteCheckedException(msg); + + break; + } + else { + try { + IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(U.bytesToInt(hdr, 8)); + + if (log.isDebugEnabled()) + log.debug("Received IGFS response [reqId=" + reqId + ", cmd=" + cmd + ']'); + + Object res = null; + + if (fut.read()) { + dis.readFully(msgHdr); + + boolean hasErr = msgHdr[4] != 0; + + if (hasErr) { + String errMsg = dis.readUTF(); + + // Error code. + Integer errCode = dis.readInt(); + + IgfsControlResponse.throwError(errCode, errMsg); + } + + int blockLen = U.bytesToInt(msgHdr, 5); + + int readLen = Math.min(blockLen, fut.outputLength()); + + if (readLen > 0) { + assert fut.outputBuffer() != null; + + dis.readFully(fut.outputBuffer(), fut.outputOffset(), readLen); + } + + if (readLen != blockLen) { + byte[] buf = new byte[blockLen - readLen]; + + dis.readFully(buf); + + res = buf; + } + } + else + res = marsh.unmarshall(cmd, hdr, dis); + + fut.onDone(res); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to apply response closure (will fail request future): " + + e.getMessage()); + + fut.onDone(e); + + err = e; + } + } + } + } + } + catch (EOFException ignored) { + err = new IgniteCheckedException("Failed to read response from server (connection was closed by remote peer)."); + } + catch (IOException e) { + if (!stopping) + log.error("Failed to read data (connection will be closed)", e); + + err = new HadoopIgfsCommunicationException(e); + } + catch (IgniteCheckedException e) { + if (!stopping) + log.error("Failed to obtain endpoint input stream (connection will be closed)", e); + + err = e; + } + finally { + close0(err); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return getClass().getSimpleName() + " [endpointAddr=" + endpointAddr + ", activeCnt=" + activeCnt + + ", stopping=" + stopping + ']'; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java new file mode 100644 index 0000000..c2dad82 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +/** + * Listens to the events of {@link HadoopIgfsIpcIo}. + */ +public interface HadoopIgfsIpcIoListener { + /** + * Callback invoked when the IO is being closed. + */ + public void onClose(); + + /** + * Callback invoked when remote error occurs. + * + * @param streamId Stream ID. + * @param errMsg Error message. + */ + public void onError(long streamId, String errMsg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java new file mode 100644 index 0000000..35fd27c --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +/** + * JCL logger wrapper for Hadoop. + */ +public class HadoopIgfsJclLogger implements IgniteLogger { + /** JCL implementation proxy. */ + private Log impl; + + /** + * Constructor. + * + * @param impl JCL implementation to use. + */ + HadoopIgfsJclLogger(Log impl) { + assert impl != null; + + this.impl = impl; + } + + /** {@inheritDoc} */ + @Override public IgniteLogger getLogger(Object ctgr) { + return new HadoopIgfsJclLogger(LogFactory.getLog( + ctgr instanceof Class ? ((Class)ctgr).getName() : String.valueOf(ctgr))); + } + + /** {@inheritDoc} */ + @Override public void trace(String msg) { + impl.trace(msg); + } + + /** {@inheritDoc} */ + @Override public void debug(String msg) { + impl.debug(msg); + } + + /** {@inheritDoc} */ + @Override public void info(String msg) { + impl.info(msg); + } + + /** {@inheritDoc} */ + @Override public void warning(String msg) { + impl.warn(msg); + } + + /** {@inheritDoc} */ + @Override public void warning(String msg, @Nullable Throwable e) { + impl.warn(msg, e); + } + + /** {@inheritDoc} */ + @Override public void error(String msg) { + impl.error(msg); + } + + /** {@inheritDoc} */ + @Override public boolean isQuiet() { + return !isInfoEnabled() && !isDebugEnabled(); + } + + /** {@inheritDoc} */ + @Override public void error(String msg, @Nullable Throwable e) { + impl.error(msg, e); + } + + /** {@inheritDoc} */ + @Override public boolean isTraceEnabled() { + return impl.isTraceEnabled(); + } + + /** {@inheritDoc} */ + @Override public boolean isDebugEnabled() { + return impl.isDebugEnabled(); + } + + /** {@inheritDoc} */ + @Override public boolean isInfoEnabled() { + return impl.isInfoEnabled(); + } + + /** {@inheritDoc} */ + @Nullable @Override public String fileName() { + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "IgfsHadoopJclLogger [impl=" + impl + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java new file mode 100644 index 0000000..662541a --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.lang.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.*; + +/** + * Communication with external process (TCP or shmem). + */ +public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener { + /** Expected result is boolean. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure(); + + /** Expected result is boolean. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Long> LONG_RES = createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure(); + + /** Expected result is {@code IgfsHandshakeResponse} */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + IgfsHandshakeResponse> HANDSHAKE_RES = createClosure(); + + /** Expected result is {@code IgfsStatus} */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsStatus> STATUS_RES = + createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + Collection<IgfsFile>> FILE_COL_RES = createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + Collection<IgfsPath>> PATH_COL_RES = createClosure(); + + /** Expected result is {@code IgfsPathSummary}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES = + createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure(); + + /** Grid name. */ + private final String grid; + + /** IGFS name. */ + private final String igfs; + + /** Client log. */ + private final Log log; + + /** Client IO. */ + private final HadoopIgfsIpcIo io; + + /** Event listeners. */ + private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap8<>(); + + /** + * Constructor for TCP endpoint. + * + * @param host Host. + * @param port Port. + * @param grid Grid name. + * @param igfs IGFS name. + * @param log Client logger. + * @throws IOException If failed. + */ + public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException { + this(host, port, grid, igfs, false, log); + } + + /** + * Constructor for shmem endpoint. + * + * @param port Port. + * @param grid Grid name. + * @param igfs IGFS name. + * @param log Client logger. + * @throws IOException If failed. + */ + public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException { + this(null, port, grid, igfs, true, log); + } + + /** + * Constructor. + * + * @param host Host. + * @param port Port. + * @param grid Grid name. + * @param igfs IGFS name. + * @param shmem Shared memory flag. + * @param log Client logger. + * @throws IOException If failed. + */ + private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log) + throws IOException { + assert host != null && !shmem || host == null && shmem : + "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; + + String endpoint = host != null ? host + ":" + port : "shmem:" + port; + + this.grid = grid; + this.igfs = igfs; + this.log = log; + + io = HadoopIgfsIpcIo.get(log, endpoint); + + io.addEventListener(this); + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException { + final IgfsHandshakeRequest req = new IgfsHandshakeRequest(); + + req.gridName(grid); + req.igfsName(igfs); + req.logDirectory(logDir); + + return io.send(req).chain(HANDSHAKE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + assert io != null; + + io.removeEventListener(this); + + if (force) + io.forceClose(); + else + io.release(); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(INFO); + msg.path(path); + + return io.send(msg).chain(FILE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(UPDATE); + msg.path(path); + msg.properties(props); + + return io.send(msg).chain(FILE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(SET_TIMES); + msg.path(path); + msg.accessTime(accessTime); + msg.modificationTime(modificationTime); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(RENAME); + msg.path(src); + msg.destinationPath(dest); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(DELETE); + msg.path(path); + msg.flag(recursive); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(AFFINITY); + msg.path(path); + msg.start(start); + msg.length(len); + + return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(PATH_SUMMARY); + msg.path(path); + + return io.send(msg).chain(SUMMARY_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(MAKE_DIRECTORIES); + msg.path(path); + msg.properties(props); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(LIST_FILES); + msg.path(path); + + return io.send(msg).chain(FILE_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(LIST_PATHS); + msg.path(path); + + return io.send(msg).chain(PATH_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IgniteCheckedException { + return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get(); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_READ); + msg.path(path); + msg.flag(false); + + IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); + + return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(IgfsPath path, + int seqReadsBeforePrefetch) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_READ); + msg.path(path); + msg.flag(true); + msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); + + IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); + + return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_CREATE); + msg.path(path); + msg.flag(overwrite); + msg.colocate(colocate); + msg.properties(props); + msg.replication(replication); + msg.blockSize(blockSize); + + Long streamId = io.send(msg).chain(LONG_RES).get(); + + return new HadoopIgfsStreamDelegate(this, streamId); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_APPEND); + msg.path(path); + msg.flag(create); + msg.properties(props); + + Long streamId = io.send(msg).chain(LONG_RES).get(); + + return new HadoopIgfsStreamDelegate(this, streamId); + } + + /** {@inheritDoc} */ + @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len, + final @Nullable byte[] outBuf, final int outOff, final int outLen) { + assert len > 0; + + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(READ_BLOCK); + msg.streamId((long) desc.target()); + msg.position(pos); + msg.length(len); + + try { + return io.send(msg, outBuf, outOff, outLen); + } + catch (IgniteCheckedException e) { + return new GridPlainFutureAdapter<>(e); + } + } + + /** {@inheritDoc} */ + @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len) + throws IOException { + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(WRITE_BLOCK); + msg.streamId((long) desc.target()); + msg.data(data); + msg.position(off); + msg.length(len); + + try { + io.sendPlain(msg); + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + } + + /** {@inheritDoc} */ + @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException { + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(CLOSE); + msg.streamId((long)desc.target()); + + try { + io.send(msg).chain(BOOL_RES).get(); + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(HadoopIgfsStreamDelegate desc, + HadoopIgfsStreamEventListener lsnr) { + long streamId = desc.target(); + + HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr); + + assert lsnr0 == null || lsnr0 == lsnr; + + if (log.isDebugEnabled()) + log.debug("Added stream event listener [streamId=" + streamId + ']'); + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) { + long streamId = desc.target(); + + HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId); + + if (lsnr0 != null && log.isDebugEnabled()) + log.debug("Removed stream event listener [streamId=" + streamId + ']'); + } + + /** {@inheritDoc} */ + @Override public void onClose() { + for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) { + try { + lsnr.onClose(); + } + catch (IgniteCheckedException e) { + log.warn("Got exception from stream event listener (will ignore): " + lsnr, e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onError(long streamId, String errMsg) { + HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId); + + if (lsnr != null) + lsnr.onError(errMsg); + else + log.warn("Received write error response for not registered output stream (will ignore) " + + "[streamId= " + streamId + ']'); + } + + /** + * Creates conversion closure for given type. + * + * @param <T> Type of expected result. + * @return Conversion closure. + */ + @SuppressWarnings("unchecked") + private static <T> GridPlainClosure<GridPlainFuture<IgfsMessage>, T> createClosure() { + return new GridPlainClosure<GridPlainFuture<IgfsMessage>, T>() { + @Override public T apply(GridPlainFuture<IgfsMessage> fut) throws IgniteCheckedException { + IgfsControlResponse res = (IgfsControlResponse)fut.get(); + + if (res.hasError()) + res.throwError(); + + return (T)res.response(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java new file mode 100644 index 0000000..902d710 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.igfs.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * IGFS Hadoop output stream implementation. + */ +public class HadoopIgfsOutputStream extends OutputStream implements HadoopIgfsStreamEventListener { + /** Log instance. */ + private Log log; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Log stream ID. */ + private long logStreamId; + + /** Server stream delegate. */ + private HadoopIgfsStreamDelegate delegate; + + /** Closed flag. */ + private volatile boolean closed; + + /** Flag set if stream was closed due to connection breakage. */ + private boolean connBroken; + + /** Error message. */ + private volatile String errMsg; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** + * Creates light output stream. + * + * @param delegate Server stream delegate. + * @param log Logger to use. + * @param clientLog Client logger. + */ + public HadoopIgfsOutputStream(HadoopIgfsStreamDelegate delegate, Log log, + IgfsLogger clientLog, long logStreamId) { + this.delegate = delegate; + this.log = log; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + + delegate.hadoop().addEventListener(delegate, this); + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } + + /** {@inheritDoc} */ + @Override public void write(@NotNull byte[] b, int off, int len) throws IOException { + check(); + + writeStart(); + + try { + delegate.hadoop().writeData(delegate, b, off, len); + + total += len; + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + write(new byte[] {(byte)b}); + + total++; + } + + /** {@inheritDoc} */ + @Override public void flush() throws IOException { + delegate.hadoop().flush(delegate); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (!closed) { + if (log.isDebugEnabled()) + log.debug("Closing output stream: " + delegate); + + writeStart(); + + delegate.hadoop().closeStream(delegate); + + markClosed(false); + + writeEnd(); + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + + if (log.isDebugEnabled()) + log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 + + ", userTime=" + userTime / 1000 + ']'); + } + else if(connBroken) + throw new IOException( + "Failed to close stream, because connection was broken (data could have been lost)."); + } + + /** + * Marks stream as closed. + * + * @param connBroken {@code True} if connection with server was lost. + */ + private void markClosed(boolean connBroken) { + // It is ok to have race here. + if (!closed) { + closed = true; + + delegate.hadoop().removeEventListener(delegate); + + this.connBroken = connBroken; + } + } + + /** + * @throws IOException If check failed. + */ + private void check() throws IOException { + String errMsg0 = errMsg; + + if (errMsg0 != null) + throw new IOException(errMsg0); + + if (closed) { + if (connBroken) + throw new IOException("Server connection was lost."); + else + throw new IOException("Stream is closed."); + } + } + + /** {@inheritDoc} */ + @Override public void onClose() throws IgniteCheckedException { + markClosed(true); + } + + /** {@inheritDoc} */ + @Override public void onError(String errMsg) { + this.errMsg = errMsg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java new file mode 100644 index 0000000..f99f14c --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.fs.permission.*; +import org.apache.ignite.*; + +import java.util.*; + +import static org.apache.ignite.IgniteFs.*; + +/** + * Hadoop file system properties. + */ +public class HadoopIgfsProperties { + /** Username. */ + private String usrName; + + /** Group name. */ + private String grpName; + + /** Permissions. */ + private FsPermission perm; + + /** + * Constructor. + * + * @param props Properties. + * @throws IgniteException In case of error. + */ + public HadoopIgfsProperties(Map<String, String> props) throws IgniteException { + usrName = props.get(PROP_USER_NAME); + grpName = props.get(PROP_GROUP_NAME); + + String permStr = props.get(PROP_PERMISSION); + + if (permStr != null) { + try { + perm = new FsPermission((short)Integer.parseInt(permStr, 8)); + } + catch (NumberFormatException ignore) { + throw new IgniteException("Permissions cannot be parsed: " + permStr); + } + } + } + + /** + * Get user name. + * + * @return User name. + */ + public String userName() { + return usrName; + } + + /** + * Get group name. + * + * @return Group name. + */ + public String groupName() { + return grpName; + } + + /** + * Get permission. + * + * @return Permission. + */ + public FsPermission permission() { + return perm; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java new file mode 100644 index 0000000..4530e64 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.igfs.common.*; + +import java.io.*; + +/** + * Secondary Hadoop file system input stream wrapper. + */ +public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable { + /** Actual input stream to the secondary file system. */ + private final FSDataInputStream is; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long readTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of read bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param is Actual input stream to the secondary file system. + * @param clientLog Client log. + */ + public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) { + assert is != null; + assert clientLog != null; + + this.is = is; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b) throws IOException { + readStart(); + + int res; + + try { + res = is.read(b); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = super.read(b, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + readStart(); + + long res; + + try { + res = is.skip(n); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSkip(logStreamId, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + readStart(); + + try { + return is.available(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + readStart(); + + try { + is.close(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseIn(logStreamId, userTime, readTime, total); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void mark(int readLimit) { + readStart(); + + try { + is.mark(readLimit); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logMark(logStreamId, readLimit); + } + + /** {@inheritDoc} */ + @Override public synchronized void reset() throws IOException { + readStart(); + + try { + is.reset(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logReset(logStreamId); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean markSupported() { + readStart(); + + try { + return is.markSupported(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + readStart(); + + int res; + + try { + res = is.read(); + } + finally { + readEnd(); + } + + if (res != -1) + total++; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = is.read(pos, buf, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + try { + is.readFully(pos, buf, off, len); + } + finally { + readEnd(); + } + + total += len; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, len); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf) throws IOException { + readStart(); + + try { + is.readFully(pos, buf); + } + finally { + readEnd(); + } + + total += buf.length; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + readStart(); + + try { + is.seek(pos); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSeek(logStreamId, pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() throws IOException { + readStart(); + + try { + return is.getPos(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException { + readStart(); + + try { + return is.seekToNewSource(targetPos); + } + finally { + readEnd(); + } + } + + /** + * Read start. + */ + private void readStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void readEnd() { + long now = System.nanoTime(); + + readTime += now - lastTs; + + lastTs = now; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java new file mode 100644 index 0000000..9ab552e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.igfs.common.*; + +import java.io.*; + +/** + * Secondary Hadoop file system output stream wrapper. + */ +public class HadoopIgfsProxyOutputStream extends OutputStream { + /** Actual output stream. */ + private FSDataOutputStream os; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param os Actual output stream. + * @param clientLog Client logger. + * @param logStreamId Log stream ID. + */ + public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) { + assert os != null; + assert clientLog != null; + + this.os = os; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized void write(int b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total++; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total += b.length; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b, int off, int len) throws IOException { + writeStart(); + + try { + os.write(b, off, len); + } + finally { + writeEnd(); + } + + total += len; + } + + /** {@inheritDoc} */ + @Override public synchronized void flush() throws IOException { + writeStart(); + + try { + os.flush(); + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + writeStart(); + + try { + os.close(); + } + finally { + writeEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + } + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsReader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsReader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsReader.java new file mode 100644 index 0000000..f410fae --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsReader.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly + * requested. + * <p> + * The class is expected to be used only from synchronized context and therefore is not tread-safe. + */ +public class HadoopIgfsReader implements IgfsReader { + /** Secondary file system. */ + private final FileSystem fs; + + /** Path to the file to open. */ + private final Path path; + + /** Buffer size. */ + private final int bufSize; + + /** Actual input stream. */ + private FSDataInputStream in; + + /** Cached error occurred during output stream open. */ + private IOException err; + + /** Flag indicating that the stream was already opened. */ + private boolean opened; + + /** + * Constructor. + * + * @param fs Secondary file system. + * @param path Path to the file to open. + * @param bufSize Buffer size. + */ + public HadoopIgfsReader(FileSystem fs, Path path, int bufSize) { + assert fs != null; + assert path != null; + + this.fs = fs; + this.path = path; + this.bufSize = bufSize; + } + + /** Get input stream. */ + private PositionedReadable in() throws IOException { + if (opened) { + if (err != null) + throw err; + } + else { + opened = true; + + try { + in = fs.open(path, bufSize); + + if (in == null) + throw new IOException("Failed to open input stream (file system returned null): " + path); + } + catch (IOException e) { + err = e; + + throw err; + } + } + + return in; + } + + /** + * Close wrapped input stream in case it was previously opened. + */ + @Override public void close() { + U.closeQuiet(in); + } + + /** {@inheritDoc} */ + @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { + return in().read(pos, buf, off, len); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java new file mode 100644 index 0000000..54f7377 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * IGFS Hadoop stream descriptor. + */ +public class HadoopIgfsStreamDelegate { + /** RPC handler. */ + private final HadoopIgfsEx hadoop; + + /** Target. */ + private final Object target; + + /** Optional stream length. */ + private final long len; + + /** + * Constructor. + * + * @param target Target. + */ + public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) { + this(hadoop, target, -1); + } + + /** + * Constructor. + * + * @param target Target. + * @param len Optional length. + */ + public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) { + assert hadoop != null; + assert target != null; + + this.hadoop = hadoop; + this.target = target; + this.len = len; + } + + /** + * @return RPC handler. + */ + public HadoopIgfsEx hadoop() { + return hadoop; + } + + /** + * @return Stream target. + */ + @SuppressWarnings("unchecked") + public <T> T target() { + return (T) target; + } + + /** + * @return Length. + */ + public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return System.identityHashCode(target); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj != null && obj instanceof HadoopIgfsStreamDelegate && + target == ((HadoopIgfsStreamDelegate)obj).target; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopIgfsStreamDelegate.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java new file mode 100644 index 0000000..6b3fa82 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.*; + +/** + * IGFS input stream event listener. + */ +public interface HadoopIgfsStreamEventListener { + /** + * Callback invoked when the stream is being closed. + * + * @throws IgniteCheckedException If failed. + */ + public void onClose() throws IgniteCheckedException; + + /** + * Callback invoked when remote error occurs. + * + * @param errMsg Error message. + */ + public void onError(String errMsg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java new file mode 100644 index 0000000..e30a4ec --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Utility constants and methods for IGFS Hadoop file system. + */ +public class HadoopIgfsUtils { + /** Parameter name for endpoint no embed mode flag. */ + public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed"; + + /** Parameter name for endpoint no shared memory flag. */ + public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem"; + + /** Parameter name for endpoint no local TCP flag. */ + public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp"; + + /** + * Get string parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return String value. + */ + public static String parameter(Configuration cfg, String name, String authority, String dflt) { + return cfg.get(String.format(name, authority != null ? authority : ""), dflt); + } + + /** + * Get integer parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return Integer value. + * @throws IOException In case of parse exception. + */ + public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException { + String name0 = String.format(name, authority != null ? authority : ""); + + try { + return cfg.getInt(name0, dflt); + } + catch (NumberFormatException ignore) { + throw new IOException("Failed to parse parameter value to integer: " + name0); + } + } + + /** + * Get boolean parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return Boolean value. + */ + public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) { + return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt); + } + + /** + * Cast Ignite exception to appropriate IO exception. + * + * @param e Exception to cast. + * @return Casted exception. + */ + public static IOException cast(IgniteCheckedException e) { + return cast(e, null); + } + + /** + * Cast Ignite exception to appropriate IO exception. + * + * @param e Exception to cast. + * @param path Path for exceptions. + * @return Casted exception. + */ + @SuppressWarnings("unchecked") + public static IOException cast(IgniteCheckedException e, @Nullable String path) { + assert e != null; + + // First check for any nested IOException; if exists - re-throw it. + if (e.hasCause(IOException.class)) + return e.getCause(IOException.class); + else if (e.hasCause(IgfsFileNotFoundException.class)) + return new FileNotFoundException(path); // TODO: Or PathNotFoundException? + else if (e.hasCause(IgfsParentNotDirectoryException.class)) + return new ParentNotDirectoryException(path); + else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class)) + return new PathIsNotEmptyDirectoryException(path); + else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class)) + return new PathExistsException(path); + else + return new IOException(e); + } + + /** + * Constructor. + */ + private HadoopIgfsUtils() { + // No-op. + } +}