http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java index 0000000,d22f2c9..679fec1 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java @@@ -1,0 -1,707 +1,707 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.util.ipc.shmem; + + import org.apache.ignite.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.resources.*; + import org.apache.ignite.thread.*; + import org.apache.ignite.internal.processors.resource.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.ipc.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.worker.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.net.*; + import java.nio.channels.*; + import java.util.*; + import java.util.concurrent.atomic.*; + + /** + * Server shared memory IPC endpoint. + */ + public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { + /** Troubleshooting public wiki page. */ + public static final String TROUBLESHOOTING_URL = "http://bit.ly/GridGain-Troubleshooting"; + + /** IPC error message. */ + public static final String OUT_OF_RESOURCES_MSG = "Failed to allocate shared memory segment " + + "(for troubleshooting see " + TROUBLESHOOTING_URL + ')'; + + /** Default endpoint port number. */ + public static final int DFLT_IPC_PORT = 10500; + + /** Default shared memory space in bytes. */ + public static final int DFLT_SPACE_SIZE = 256 * 1024; + + /** - * Default token directory. Note that this path is relative to {@code GRIDGAIN_HOME/work} folder - * if {@code GRIDGAIN_HOME} system or environment variable specified, otherwise it is relative to ++ * Default token directory. Note that this path is relative to {@code IGNITE_HOME/work} folder ++ * if {@code IGNITE_HOME} system or environment variable specified, otherwise it is relative to + * {@code work} folder under system {@code java.io.tmpdir} folder. + * + * @see org.apache.ignite.configuration.IgniteConfiguration#getWorkDirectory() + */ + public static final String DFLT_TOKEN_DIR_PATH = "ipc/shmem"; + + /** + * Shared memory token file name prefix. + * + * Token files are created and stored in the following manner: [tokDirPath]/[nodeId]-[current + * PID]/gg-shmem-space-[auto_idx]-[other_party_pid]-[size] + */ + public static final String TOKEN_FILE_NAME = "gg-shmem-space-"; + + /** Default lock file name. */ + private static final String LOCK_FILE_NAME = "lock.file"; + + /** GC frequency. */ + private static final long GC_FREQ = 10000; + + /** ID generator. */ + private static final AtomicLong tokIdxGen = new AtomicLong(); + + /** Port to bind socket to. */ + private int port = DFLT_IPC_PORT; + + /** Prefix. */ + private String tokDirPath = DFLT_TOKEN_DIR_PATH; + + /** Space size. */ + private int size = DFLT_SPACE_SIZE; + + /** Server socket. */ + @GridToStringExclude + private ServerSocket srvSock; + + /** Token directory. */ + private File tokDir; + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Local node ID. */ + private UUID locNodeId; + + /** Grid name. */ + private String gridName; + + /** Flag allowing not to print out of resources warning. */ + private boolean omitOutOfResourcesWarn; + + /** GC worker. */ + private GridWorker gcWorker; + + /** Pid of the current process. */ + private int pid; + + /** Closed flag. */ + private volatile boolean closed; + + /** Spaces opened on with this endpoint. */ + private final Collection<IpcSharedMemoryClientEndpoint> endpoints = + new GridConcurrentHashSet<>(); + + /** Use this constructor when dependencies could be injected with {@link GridResourceProcessor#injectGeneric(Object)}. */ + public IpcSharedMemoryServerEndpoint() { + // No-op. + } + + /** + * Constructor to set dependencies explicitly. + * + * @param log Log. + * @param locNodeId Node id. + * @param gridName Grid name. + */ + public IpcSharedMemoryServerEndpoint(IgniteLogger log, UUID locNodeId, String gridName) { + this.log = log; + this.locNodeId = locNodeId; + this.gridName = gridName; + } + + /** @param omitOutOfResourcesWarn If {@code true}, out of resources warning will not be printed by server. */ + public void omitOutOfResourcesWarning(boolean omitOutOfResourcesWarn) { + this.omitOutOfResourcesWarn = omitOutOfResourcesWarn; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + IpcSharedMemoryNativeLoader.load(); + + pid = IpcSharedMemoryUtils.pid(); + + if (pid == -1) + throw new IpcEndpointBindException("Failed to get PID of the current process."); + + if (size <= 0) + throw new IpcEndpointBindException("Space size should be positive: " + size); + + String tokDirPath = this.tokDirPath; + + if (F.isEmpty(tokDirPath)) + throw new IpcEndpointBindException("Token directory path is empty."); + + tokDirPath = tokDirPath + '/' + locNodeId.toString() + '-' + IpcSharedMemoryUtils.pid(); + + tokDir = U.resolveWorkDirectory(tokDirPath, false); + + if (port <= 0 || port >= 0xffff) + throw new IpcEndpointBindException("Port value is illegal: " + port); + + try { + srvSock = new ServerSocket(); + + // Always bind to loopback. + srvSock.bind(new InetSocketAddress("127.0.0.1", port)); + } + catch (IOException e) { + // Although empty socket constructor never throws exception, close it just in case. + U.closeQuiet(srvSock); + + throw new IpcEndpointBindException("Failed to bind shared memory IPC endpoint (is port already " + + "in use?): " + port, e); + } + + gcWorker = new GcWorker(gridName, "ipc-shmem-gc", log); + + new IgniteThread(gcWorker).start(); + + if (log.isInfoEnabled()) + log.info("IPC shared memory server endpoint started [port=" + port + + ", tokDir=" + tokDir.getAbsolutePath() + ']'); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ErrorNotRethrown") + @Override public IpcEndpoint accept() throws IgniteCheckedException { + while (!Thread.currentThread().isInterrupted()) { + Socket sock = null; + + boolean accepted = false; + + try { + sock = srvSock.accept(); + + accepted = true; + + InputStream inputStream = sock.getInputStream(); + ObjectInputStream in = new ObjectInputStream(inputStream); + + ObjectOutputStream out = new ObjectOutputStream(sock.getOutputStream()); + + IpcSharedMemorySpace inSpace = null; + + IpcSharedMemorySpace outSpace = null; + + boolean err = true; + + try { + IpcSharedMemoryInitRequest req = (IpcSharedMemoryInitRequest)in.readObject(); + + if (log.isDebugEnabled()) + log.debug("Processing request: " + req); + + IgnitePair<String> p = inOutToken(req.pid(), size); + + String file1 = p.get1(); + String file2 = p.get2(); + + assert file1 != null; + assert file2 != null; + + // Create tokens. + new File(file1).createNewFile(); + new File(file2).createNewFile(); + + if (log.isDebugEnabled()) + log.debug("Created token files: " + p); + + inSpace = new IpcSharedMemorySpace( + file1, + req.pid(), + pid, + size, + true, + log); + + outSpace = new IpcSharedMemorySpace( + file2, + pid, + req.pid(), + size, + false, + log); + + IpcSharedMemoryClientEndpoint ret = new IpcSharedMemoryClientEndpoint(inSpace, outSpace, + log); + + out.writeObject(new IpcSharedMemoryInitResponse(file2, outSpace.sharedMemoryId(), + file1, inSpace.sharedMemoryId(), pid, size)); + + err = !in.readBoolean(); + + endpoints.add(ret); + + return ret; + } + catch (UnsatisfiedLinkError e) { + throw IpcSharedMemoryUtils.linkError(e); + } + catch (IOException e) { + if (log.isDebugEnabled()) + log.debug("Failed to process incoming connection " + + "(was connection closed by another party):" + e.getMessage()); + } + catch (ClassNotFoundException e) { + U.error(log, "Failed to process incoming connection.", e); + } + catch (ClassCastException e) { + String msg = "Failed to process incoming connection (most probably, shared memory " + + "rest endpoint has been configured by mistake)."; + + LT.warn(log, null, msg); + + sendErrorResponse(out, e); + } + catch (IpcOutOfSystemResourcesException e) { + if (!omitOutOfResourcesWarn) + LT.warn(log, null, OUT_OF_RESOURCES_MSG); + + sendErrorResponse(out, e); + } + catch (IgniteCheckedException e) { + LT.error(log, e, "Failed to process incoming shared memory connection."); + + sendErrorResponse(out, e); + } + finally { + // Exception has been thrown, need to free system resources. + if (err) { + if (inSpace != null) + inSpace.forceClose(); + + // Safety. + if (outSpace != null) + outSpace.forceClose(); + } + } + } + catch (IOException e) { + if (!Thread.currentThread().isInterrupted() && !accepted) + throw new IgniteCheckedException("Failed to accept incoming connection.", e); + + if (!closed) + LT.error(log, null, "Failed to process incoming shared memory connection: " + e.getMessage()); + } + finally { + U.closeQuiet(sock); + } + } // while + + throw new IgniteInterruptedException("Socket accept was interrupted."); + } + + /** + * Injects resources. + * + * @param ignite Ignite + */ + @IgniteInstanceResource + private void injectResources(Ignite ignite){ + if (ignite != null) { + // Inject resources. + gridName = ignite.name(); + locNodeId = ignite.configuration().getNodeId(); + } + else { + // Cleanup resources. + gridName = null; + locNodeId = null; + } + } + + /** + * @param out Output stream. + * @param err Error cause. + */ + private void sendErrorResponse(ObjectOutput out, Exception err) { + try { + out.writeObject(new IpcSharedMemoryInitResponse(err)); + } + catch (IOException e) { + U.error(log, "Failed to send error response to client.", e); + } + } + + /** + * @param pid PID of the other party. + * @param size Size of the space. + * @return Token pair. + */ + private IgnitePair<String> inOutToken(int pid, int size) { + while (true) { + long idx = tokIdxGen.get(); + + if (tokIdxGen.compareAndSet(idx, idx + 2)) + return F.pair(new File(tokDir, TOKEN_FILE_NAME + idx + "-" + pid + "-" + size).getAbsolutePath(), + new File(tokDir, TOKEN_FILE_NAME + (idx + 1) + "-" + pid + "-" + size).getAbsolutePath()); + } + } + + /** {@inheritDoc} */ + @Override public int getPort() { + return port; + } + + /** {@inheritDoc} */ + @Nullable @Override public String getHost() { + return null; + } + + /** + * {@inheritDoc} + * + * @return {@code false} as shared memory endpoints can not be used for management. + */ + @Override public boolean isManagement() { + return false; + } + + /** + * Sets port endpoint will be bound to. + * + * @param port Port number. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Gets token directory path. + * + * @return Token directory path. + */ + public String getTokenDirectoryPath() { + return tokDirPath; + } + + /** + * Sets token directory path. + * + * @param tokDirPath Token directory path. + */ + public void setTokenDirectoryPath(String tokDirPath) { + this.tokDirPath = tokDirPath; + } + + /** + * Gets size of shared memory spaces that are created by the endpoint. + * + * @return Size of shared memory space. + */ + public int getSize() { + return size; + } + + /** + * Sets size of shared memory spaces that are created by the endpoint. + * + * @param size Size of shared memory space. + */ + public void setSize(int size) { + this.size = size; + } + + /** {@inheritDoc} */ + @Override public void close() { + closed = true; + + U.closeQuiet(srvSock); + + if (gcWorker != null) { + U.cancel(gcWorker); + + // This method may be called from already interrupted thread. + // Need to ensure cleaning on close. + boolean interrupted = Thread.interrupted(); + + try { + U.join(gcWorker); + } + catch (IgniteInterruptedException e) { + U.warn(log, "Interrupted when stopping GC worker.", e); + } + finally { + if (interrupted) + Thread.currentThread().interrupt(); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IpcSharedMemoryServerEndpoint.class, this); + } + + /** + * Sets configuration properties from the map. + * + * @param endpointCfg Map of properties. + * @throws IgniteCheckedException If invalid property name or value. + */ + public void setupConfiguration(Map<String, String> endpointCfg) throws IgniteCheckedException { + for (Map.Entry<String,String> e : endpointCfg.entrySet()) { + try { + switch (e.getKey()) { + case "type": + case "host": + case "management": + //Ignore these properties + break; + + case "port": + setPort(Integer.parseInt(e.getValue())); + break; + + case "size": + setSize(Integer.parseInt(e.getValue())); + break; + + case "tokenDirectoryPath": + setTokenDirectoryPath(e.getValue()); + break; + + default: + throw new IgniteCheckedException("Invalid property '" + e.getKey() + "' of " + getClass().getSimpleName()); + } + } + catch (Throwable t) { + if (t instanceof IgniteCheckedException) + throw t; + + throw new IgniteCheckedException("Invalid value '" + e.getValue() + "' of the property '" + e.getKey() + "' in " + + getClass().getSimpleName(), t); + } + } + } + + /** + * + */ + private class GcWorker extends GridWorker { + /** + * @param gridName Grid name. + * @param name Name. + * @param log Log. + */ + protected GcWorker(@Nullable String gridName, String name, IgniteLogger log) { + super(gridName, name, log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + if (log.isDebugEnabled()) + log.debug("GC worker started."); + + File workTokDir = tokDir.getParentFile(); + + assert workTokDir != null; + + while (!isCancelled()) { + U.sleep(GC_FREQ); + + if (log.isDebugEnabled()) + log.debug("Starting GC iteration."); + + RandomAccessFile lockFile = null; + + FileLock lock = null; + + try { + lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME), "rw"); + + lock = lockFile.getChannel().lock(); + + if (lock != null) + processTokenDirectory(workTokDir); + else if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (OverlappingFileLockException ignored) { + if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (IOException e) { + U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(), e); + } + finally { + U.releaseQuiet(lock); + U.closeQuiet(lockFile); + } + + // Process spaces created by this endpoint. + if (log.isDebugEnabled()) + log.debug("Processing local spaces."); + + for (IpcSharedMemoryClientEndpoint e : endpoints) { + if (log.isDebugEnabled()) + log.debug("Processing endpoint: " + e); + + if (!e.checkOtherPartyAlive()) { + endpoints.remove(e); + + if (log.isDebugEnabled()) + log.debug("Removed endpoint: " + e); + } + } + } + } + + /** @param workTokDir Token directory (common for multiple nodes). */ + private void processTokenDirectory(File workTokDir) { + for (File f : workTokDir.listFiles()) { + if (!f.isDirectory()) { + if (!f.getName().equals(LOCK_FILE_NAME)) { + if (log.isDebugEnabled()) + log.debug("Unexpected file: " + f.getName()); + } + + continue; + } + + if (f.equals(tokDir)) { + if (log.isDebugEnabled()) + log.debug("Skipping own token directory: " + tokDir.getName()); + + continue; + } + + String name = f.getName(); + + int pid; + + try { + pid = Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)); + } + catch (NumberFormatException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to parse file name: " + name); + + continue; + } + + // Is process alive? + if (IpcSharedMemoryUtils.alive(pid)) { + if (log.isDebugEnabled()) + log.debug("Skipping alive node: " + pid); + + continue; + } + + if (log.isDebugEnabled()) + log.debug("Possibly stale token folder: " + f); + + // Process each token under stale token folder. + File[] shmemToks = f.listFiles(); + + if (shmemToks == null) + // Although this is strange, but is reproducible sometimes on linux. + return; + + int rmvCnt = 0; + + try { + for (File f0 : shmemToks) { + if (log.isDebugEnabled()) + log.debug("Processing token file: " + f0.getName()); + + if (f0.isDirectory()) { + if (log.isDebugEnabled()) + log.debug("Unexpected directory: " + f0.getName()); + } + + // Token file format: gg-shmem-space-[auto_idx]-[other_party_pid]-[size] + String[] toks = f0.getName().split("-"); + + if (toks.length != 6) { + if (log.isDebugEnabled()) + log.debug("Unrecognized token file: " + f0.getName()); + + continue; + } + + int pid0; + int size; + + try { + pid0 = Integer.parseInt(toks[4]); + size = Integer.parseInt(toks[5]); + } + catch (NumberFormatException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to parse file name: " + name); + + continue; + } + + if (IpcSharedMemoryUtils.alive(pid0)) { + if (log.isDebugEnabled()) + log.debug("Skipping alive process: " + pid0); + + continue; + } + + if (log.isDebugEnabled()) + log.debug("Possibly stale token file: " + f0); + + IpcSharedMemoryUtils.freeSystemResources(f0.getAbsolutePath(), size); + + if (f0.delete()) { + if (log.isDebugEnabled()) + log.debug("Deleted file: " + f0.getName()); + + rmvCnt++; + } + else if (!f0.exists()) { + if (log.isDebugEnabled()) + log.debug("File has been concurrently deleted: " + f0.getName()); + + rmvCnt++; + } + else if (log.isDebugEnabled()) + log.debug("Failed to delete file: " + f0.getName()); + } + } + finally { + // Assuming that no new files can appear, since + if (rmvCnt == shmemToks.length) { + U.delete(f); + + if (log.isDebugEnabled()) + log.debug("Deleted empty token directory: " + f.getName()); + } + } + } + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java index 0000000,ba4be48..249d995 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java @@@ -1,0 -1,374 +1,374 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.util.ipc.shmem; + + import org.apache.ignite.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.nio.*; + import java.util.concurrent.atomic.*; + import java.util.concurrent.locks.*; + + import static org.apache.ignite.IgniteSystemProperties.*; + + /** + * + */ + @SuppressWarnings({"PointlessBooleanExpression", "ConstantConditions"}) + public class IpcSharedMemorySpace implements Closeable { + /** Debug flag (enable for testing). */ - private static final boolean DEBUG = Boolean.getBoolean(GG_IPC_SHMEM_SPACE_DEBUG); ++ private static final boolean DEBUG = Boolean.getBoolean(IGNITE_IPC_SHMEM_SPACE_DEBUG); + + /** Shared memory segment size (operable). */ + private final int opSize; + + /** Shared memory native pointer. */ + private final long shmemPtr; + + /** Shared memory ID. */ + private final int shmemId; + + /** Semaphore set ID. */ + private final int semId; + + /** Local closed flag. */ + private final AtomicBoolean closed = new AtomicBoolean(); + + /** {@code True} if space has been closed. */ + private final boolean isReader; + + /** Lock to protect readers and writers from concurrent close. */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** */ + private final int writerPid; + + /** */ + private final int readerPid; + + /** */ + private final String tokFileName; + + /** */ + private final IgniteLogger log; + + /** + * This will allocate system resources for the space. + * + * @param tokFileName Token filename. + * @param writerPid Writer PID. + * @param readerPid Reader PID. + * @param size Size in bytes. + * @param reader {@code True} if reader. + * @param parent Parent logger. + * @throws IgniteCheckedException If failed. + */ + public IpcSharedMemorySpace(String tokFileName, int writerPid, int readerPid, int size, boolean reader, + IgniteLogger parent) throws IgniteCheckedException { + assert size > 0 : "Size cannot be less than 1 byte"; + + log = parent.getLogger(IpcSharedMemorySpace.class); + + opSize = size; + + shmemPtr = IpcSharedMemoryUtils.allocateSystemResources(tokFileName, size, DEBUG && log.isDebugEnabled()); + + shmemId = IpcSharedMemoryUtils.sharedMemoryId(shmemPtr); + semId = IpcSharedMemoryUtils.semaphoreId(shmemPtr); + + isReader = reader; + + this.tokFileName = tokFileName; + this.readerPid = readerPid; + this.writerPid = writerPid; + + if (DEBUG && log.isDebugEnabled()) + log.debug("Shared memory space has been created: " + this); + } + + /** + * This should be called in order to attach to already allocated system resources. + * + * @param tokFileName Token file name (for proper cleanup). + * @param writerPid Writer PID. + * @param readerPid Reader PID. + * @param size Size. + * @param reader Reader flag. + * @param shmemId Shared memory ID. + * @param parent Logger. + * @throws IgniteCheckedException If failed. + */ + public IpcSharedMemorySpace(String tokFileName, int writerPid, int readerPid, int size, boolean reader, + int shmemId, IgniteLogger parent) throws IgniteCheckedException { + assert size > 0 : "Size cannot be less than 1 byte"; + + log = parent.getLogger(IpcSharedMemorySpace.class); + + opSize = size; + isReader = reader; + this.shmemId = shmemId; + this.writerPid = writerPid; + this.readerPid = readerPid; + this.tokFileName = tokFileName; + + shmemPtr = IpcSharedMemoryUtils.attach(shmemId, DEBUG && log.isDebugEnabled()); + + semId = IpcSharedMemoryUtils.semaphoreId(shmemPtr); + } + + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). + * @throws IgniteCheckedException If space has been closed. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. + */ + public void write(byte[] buf, int off, int len, long timeout) throws IgniteCheckedException, + IpcSharedMemoryOperationTimedoutException { + assert buf != null; + assert len > 0; + assert buf.length >= off + len; + assert timeout >= 0; + + assert !isReader; + + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + IpcSharedMemoryUtils.writeSharedMemory(shmemPtr, buf, off, len, timeout); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). + * @throws IgniteCheckedException If space has been closed. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. + */ + public void write(ByteBuffer buf, int off, int len, long timeout) throws IgniteCheckedException, + IpcSharedMemoryOperationTimedoutException { + assert buf != null; + assert len > 0; + assert buf.limit() >= off + len; + assert timeout >= 0; + assert !isReader; + + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + IpcSharedMemoryUtils.writeSharedMemoryByteBuffer(shmemPtr, buf, off, len, timeout); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Blocks until at least 1 byte is read. + * + * @param buf Buffer. + * @param off Offset. + * @param len Length. + * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). + * @return Read bytes count. + * @throws IgniteCheckedException If space has been closed. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. + */ + public int read(byte[] buf, int off, int len, long timeout) throws IgniteCheckedException, + IpcSharedMemoryOperationTimedoutException { + assert buf != null; + assert len > 0; + assert buf.length >= off + len; + + assert isReader; + + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + return (int) IpcSharedMemoryUtils.readSharedMemory(shmemPtr, buf, off, len, timeout); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Blocks until at least 1 byte is read. + * + * @param buf Buffer. + * @param off Offset. + * @param len Length. + * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). + * @return Read bytes count. + * @throws IgniteCheckedException If space has been closed. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. + */ + public int read(ByteBuffer buf, int off, int len, long timeout) throws IgniteCheckedException, + IpcSharedMemoryOperationTimedoutException { + assert buf != null; + assert len > 0; + assert buf.capacity() >= off + len; + assert isReader; + + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + return (int) IpcSharedMemoryUtils.readSharedMemoryByteBuffer(shmemPtr, buf, off, len, timeout); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void close() { + close0(false); + } + + /** + * Forcibly closes the space and frees all system resources. + * <p> + * This method should be called with caution as it may result to the other-party + * process crash. It is intended to call when there was an IO error during handshake + * and other party has not yet attached to the space. + */ + public void forceClose() { + close0(true); + } + + /** + * @return Shared memory ID. + */ + public int sharedMemoryId() { + return shmemId; + } + + /** + * @return Semaphore set ID. + */ + public int semaphoreId() { + return semId; + } + + /** + * @param force {@code True} to close the space. + */ + private void close0(boolean force) { + if (!closed.compareAndSet(false, true)) + return; + + IpcSharedMemoryUtils.ipcClose(shmemPtr); + + // Wait all readers and writes to leave critical section. + lock.writeLock().lock(); + + try { + IpcSharedMemoryUtils.freeSystemResources(tokFileName, shmemPtr, force); + } + finally { + lock.writeLock().unlock(); + } + + if (DEBUG && log.isDebugEnabled()) + log.debug("Shared memory space has been closed: " + this); + } + + /** + * @return Bytes available for read. + * @throws IgniteCheckedException If failed. + */ + public int unreadCount() throws IgniteCheckedException { + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + return IpcSharedMemoryUtils.unreadCount(shmemPtr); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @return Shared memory pointer. + */ + public long sharedMemPointer() { + return shmemPtr; + } + + /** + * @return Reader PID. + */ + public int readerPid() { + return readerPid; + } + + /** + * @return Writer PID. + */ + public int writerPid() { + return writerPid; + } + + /** + * @return Vis-a-vis PID. + */ + public int otherPartyPid() { + return isReader ? writerPid : readerPid; + } + + /** + * @return Token file name used to create shared memory space. + */ + public String tokenFileName() { + return tokFileName; + } + + /** + * @return Space size. + */ + public int size() { + return opSize; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IpcSharedMemorySpace.class, this, "closed", closed.get()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/IgniteOptimizedMarshaller.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithSpecifiedWorkDirectorySelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedGridGainHomeSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedGridGainHomeSelfTest.java index 095bd2d,a56f8b2..8fae702 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedGridGainHomeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedGridGainHomeSelfTest.java @@@ -29,10 -29,10 +29,10 @@@ import org.apache.ignite.internal.util. import org.apache.ignite.testframework.junits.common.*; import static org.apache.ignite.IgniteSystemProperties.*; - import static org.apache.ignite.internal.util.GridUtils.*; + import static org.apache.ignite.internal.util.IgniteUtils.*; /** - * Checks that node can be started without operations with undefined GRIDGAIN_HOME. + * Checks that node can be started without operations with undefined IGNITE_HOME. * <p> * Notes: * 1. The test intentionally extends JUnit {@link TestCase} class to make the test http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/deploy/VisorDeployCommand.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/start/VisorStartCommand.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala ----------------------------------------------------------------------