http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/ipc/shmem/igniteshmem/org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.cpp ---------------------------------------------------------------------- diff --git a/ipc/shmem/igniteshmem/org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.cpp b/ipc/shmem/igniteshmem/org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.cpp new file mode 100644 index 0000000..eaa13fd --- /dev/null +++ b/ipc/shmem/igniteshmem/org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.cpp @@ -0,0 +1,882 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.h" + +#include <cstdio> +#include <cstdlib> +#include <cstring> +#include <cerrno> +#include <sys/types.h> +#include <sys/ipc.h> +#include <sys/shm.h> +#include <sys/sem.h> +#include <sys/stat.h> +#include <string> +#include <iostream> +#include <time.h> +#include <sys/time.h> +#include <unistd.h> +#include <signal.h> + +using namespace std; + +/** IgniteCheckedException JNI class name. */ +const char* GRID_EXCEPTION = "org/apache/ignite/IgniteCheckedException"; + +/** GridIpcSharedMemoryOperationTimedoutException JNI class name. */ +const char* OP_TIMEDOUT_EXCEPTION = "org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOperationTimedoutException"; + +/** GridIpcOutOfSystemResourcesException JNI class name. */ +const char* OUT_OF_RSRCS_EXCEPTION = "org/apache/ignite/internal/util/ipc/shmem/GridIpcOutOfSystemResourcesException"; + +/** Global flag for enabling debug logging. */ +static bool __GG_DEBUG = false; + +/** Read semaphore ID. */ +#define SEM_READ 0 + +/** Write semaphore ID. */ +#define SEM_WRITE 1 + +/** + * Logging macro. + * + * @param m Logging message with optional formatting symbols. + * @param varagrs Formatting arguments. + */ +#define GG_LOG_DEBUG(m, ...) {\ + if(__GG_DEBUG)\ + log(__FILE__, __LINE__, __FUNCTION__, m, __VA_ARGS__);\ +} + +/** Buffer size for current time string. */ +#define TIME_NOW_BUF_SIZE 1024 + +/** Buffer size for debug message. */ +#define FORMAT_LOG_BUF_SIZE 4096 + +/** + * @return Current time string in format: year-month-day hour:minute:second. + */ +static string timeNow() { + timeval tv; + tm lt; + + char timebuf[TIME_NOW_BUF_SIZE]; + + gettimeofday(&tv, 0); + + time_t now = tv.tv_sec; + localtime_r(&now, <); + + // Clone the format used by log4j ISO8601DateFormat, + // specifically: "yyyy-MM-dd HH:mm:ss.SSS" + size_t len = strftime(timebuf, TIME_NOW_BUF_SIZE, "%Y-%m-%d %H:%M:%S", <); + + snprintf(timebuf + len, TIME_NOW_BUF_SIZE - len, ".%03d", (int) (tv.tv_usec / 1000)); + + return string(timebuf); +} + +/** + * Writes debug message to standard output, if global debug flag is enabled. + * + * @param file Source file, from which the message originates. + * @param line Source line, from which the message originates. + * @param funcName Name of the function, from which the message originates. + * @param format Message string with optional formatting symbols. + * @param varargs Formatting arguments. + */ +static void log(const char* file, int line, const char* funcName, const char* format, ...) { + static pid_t pid = getpid(); + + char msgbuf[FORMAT_LOG_BUF_SIZE]; + va_list va; + + va_start(va, format); + vsnprintf(msgbuf, FORMAT_LOG_BUF_SIZE - 1, format, va); + va_end(va); + + cout << timeNow() << " pid:" << pid << " " << file << ":" << funcName << ":" << line << ": " << msgbuf << endl; + + flush(cout); +} + +/** Lock operation on semaphore #0. */ +struct sembuf op_lock0[2] = { + 0, 0, 0, // Wait until semaphore #0 becomes 0. + 0, 1, 0 // Then increment semaphore #0 by 1. +}; + +/** Lock operation on semaphore #1. */ +struct sembuf op_lock1[2] = { + 1, 0, 0, // Wait until semaphore #1 becomes 0. + 1, 1, 0 // Then increment semaphore #1 by 1. +}; + +/** + * Data offset in shared memory buffer (the memory segment preceding data + * is used for IPC data). + */ +#define BUF_OFFSET 64 + +/** + * IPC data, that is used for inter-process communication. + */ +typedef struct { + /** Number of parties that have closed the connection (0, 1, or 2). */ + int closedCnt; + + /** Shared memory segment ID. */ + int shmId; + + /** Semaphore set ID. */ + int semId; + + /** Shared memory segment size. */ + int size; + + /** Closed flag. */ + volatile bool closed; + + /** Read position. */ + volatile unsigned int readCnt; + + /** Write position (should be always >= readCnt). */ + volatile unsigned int writeCnt; + + /** Flag, indicating that reader is waiting on semaphore. */ + volatile bool readBlocked; + + /** Flag, indicating that writer is waiting on semaphore. */ + volatile bool writeBlocked; +} T_IpcData; + +/** + * Calculates unread bytes count, given IPC data pointer. + * + * @param ipcData IPC data pointer. + * @param fetchWriteCnt True to fetch write count or read it normally. + * @param fetchReadCnt True to fetch read count or read it normally.. + * @return Unread bytes count. + */ +static unsigned int getUnreadCount(T_IpcData *ipcData, bool fetchWriteCnt, bool fetchReadCnt) { + unsigned int writeCnt = fetchWriteCnt ? __sync_fetch_and_add(&ipcData->writeCnt, 0) : ipcData->writeCnt; + unsigned int readCnt = fetchReadCnt ? __sync_fetch_and_add(&ipcData->readCnt, 0) : ipcData->readCnt; + + unsigned int unreadCnt = writeCnt - readCnt; + + if (unreadCnt < 0) { + GG_LOG_DEBUG("Unread count failed [writeCnt=%u, readCnt=%u]", writeCnt, readCnt); + + *((char *) 0) = 5; + } + + return unreadCnt; +} + +/** + * Throws exception in Java code. + * + * @param env JNI environment. + * @param clsName Exception class full name (slashed notation). + */ +static void throwException(JNIEnv* env, const char* clsName) { + // We assume that 512 bytes will be enough. + char msg[512]; + + ::sprintf(msg, "%s (error code: %d).", ::strerror(errno), errno); + + env->ThrowNew(env->FindClass(clsName), msg); +} + +/** + * Throws exception in Java code according to current + * errno value. + * + * @param env JNI environment. + */ +static void throwExceptionByErrno(JNIEnv* env) { + switch (errno) { + case ENOMEM: + case EMFILE: + case ENOSPC: + throwException(env, OUT_OF_RSRCS_EXCEPTION); + break; + + default: + throwException(env, GRID_EXCEPTION); + break; + } +} + +/** + * Initializes semaphore. + * + * @param env JNI environment. + * @param semId Semaphore set ID. + * @param semNum Semaphore number in semaphore set. + */ +static bool semInit(JNIEnv* env, int semId, int semNum) { + struct sembuf sb; + memset(&sb, 0, sizeof(sb)); + + // Initialize the semaphore. + sb.sem_op = 1; + sb.sem_num = semNum; + + if (::semop(semId, &sb, 1) == -1) { + GG_LOG_DEBUG("Semaphore init failed [semId=%d, semNum=%s, errno=%d]", semId, + semNum == SEM_READ ? "SEM_READ" : "SEM_WRITE", errno); + + throwException(env, GRID_EXCEPTION); + + return false; + } + + return true; +} + +/** + * Waits on semaphore until another process has signaled or the semaphore + * has been removed. + * + * @param env JNI environment. + * @param semId Semaphore set ID. + * @param semNum Semaphore number in semaphore set. + * @param timeout Timeout for wait operation, if supported on current platform. + * @param ipcData IPC data pointer. + * @see semNotify() + */ +static void semWait(JNIEnv * env, int semId, int semNum, int timeout, T_IpcData *ipcData) { + while (1) { + int ret; +#ifdef HAVE_SEMTIMEDOP + _STRUCT_TIMESPEC timeout0 = { + 0, timeout * 1000 + }; + ret = semtimedop(semId, semNum == 0 ? op_lock0 : op_lock1, 2, timeout > 0 ? &timeout0 : NULL); +#else + ret = semop(semId, semNum == 0 ? op_lock0 : op_lock1, 2); +#endif + if (ret == 0) + return; + + if (errno == EIDRM || errno == EINVAL) { // Semaphore was removed while waiting. + if (!ipcData->closed) { + GG_LOG_DEBUG("Semaphore removed, but the space is not closed [semId=%d]", semId); + + ipcData->closed = true; + } + + return; + } + + GG_LOG_DEBUG("Semaphore wait failed [semId=%d, semNum=%s, errno=%d]", semId, + semNum == SEM_READ ? "SEM_READ" : "SEM_WRITE", errno); + + if (errno == EINTR) { + // spin again + } + else if (errno == EAGAIN) { + throwException(env, OP_TIMEDOUT_EXCEPTION); + + return; + } + else { + throwException(env, GRID_EXCEPTION); + } + } +} + +/** + * Notifies the semaphore to signal other process waiting on this semaphore + * to resume execution. + * + * @param env JNI environment. + * @param semId Semaphore set ID. + * @param semNum Semaphore number in semaphore set. + * @param ipcData IPC data pointer. + * @see semWait() + */ +static void semNotify(JNIEnv * env, int semId, int semNum, T_IpcData *ipcData) { + if (::semctl(semId, semNum, SETVAL, 0) == -1) { + if (errno == EIDRM || errno == EINVAL) { + if (!ipcData->closed) { + GG_LOG_DEBUG("Semaphore removed, but the space is not closed [semId=%d]", semId); + + ipcData->closed = true; + } + + return; + } + + GG_LOG_DEBUG("Semaphore wait failed [semId=%d, semNum=%s, errno=%d]", semId, + semNum == SEM_READ ? "SEM_READ" : "SEM_WRITE", errno); + + throwException(env, GRID_EXCEPTION); + } +} + +/** + * Allocates shared memory segment and semaphores for inter-process communication (JNI method). + * + * @param env JNI environment. + * @param jTokFileName Token file name for allocating resources. + * @param size Shared memory segment size in bytes. + * @param debug Debug flag, which modifies the global debug flag. This parameter is expected + * to be always the same during application lifetime. + */ +jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_allocateSystemResources( + JNIEnv * env, jclass, jstring jTokFileName, jint size, jboolean debug) { + int key = -1; + int shmId = -1; + int semId = -1; + jboolean isCopy = false; + + // Copy to STL string and release. + const char* tokFileName0 = env->GetStringUTFChars(jTokFileName, &isCopy); + + string tokFileName(tokFileName0); + + env->ReleaseStringUTFChars(jTokFileName, tokFileName0); + + // Set global debug flag. + __GG_DEBUG = debug; + + // Get system token, used in IPC. + if ((key = ::ftok(tokFileName.c_str(), 'G')) == -1) { + throwException(env, GRID_EXCEPTION); + + return 0; + } + + // Get shared memory descriptor (create shared memory segment if absent). + if ((shmId = ::shmget(key, size + BUF_OFFSET, 0666 | IPC_CREAT)) == -1) { + throwExceptionByErrno(env); + + return 0; + } + + void* data = ::shmat(shmId, (void *) 0, 0); + +#ifndef __APPLE__ + // Shared memory segment will be deleted upon last process detach. + ::shmctl(shmId, IPC_RMID, NULL); +#else + GG_LOG_DEBUG("Will not mark shared memory region for deletion (will be removed on close): %d", shmId); +#endif + + if ((ptrdiff_t) data == -1) { + // Exception will be thrown on return. + throwExceptionByErrno(env); + + return 0; + } + + T_IpcData *ipcData = (T_IpcData*) data; + + // Allocate semaphores for native synchronization. + if ((semId = ::semget(key, 2, 0666 | IPC_CREAT)) == -1) { + // Exception will be thrown on return. + throwExceptionByErrno(env); + + // Cleanup ignoring possible errors. + ::shmdt(ipcData); + + return 0; + } + + // Initialize SEM_READ and SEM_WRITE to 1. + if (!semInit(env, semId, SEM_READ) || !semInit(env, semId, SEM_WRITE)) { + // Exception will be thrown on return. + throwException(env, GRID_EXCEPTION); + + // Cleanup ignoring possible errors. + ::semctl(semId, 0, IPC_RMID); + ::shmdt(ipcData); + + return 0; + } + + // Initialize data structure. + memset(ipcData, 0, sizeof(*ipcData)); + ipcData->shmId = shmId; + ipcData->semId = semId; + ipcData->size = size; + + return (jlong) (((char*) data) + BUF_OFFSET); +} + +/** + * Attaches to an existing shared memory segment (JNI method). + * + * @param env JNI environment. + * @param shmId Shared memory segment ID. + * @param debug Debug flag, which modifies the global debug flag. This parameter is expected + * to be always the same during application lifetime. + */ +jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_attach(JNIEnv * env, jclass, + jint shmId, jboolean debug) { + // Set global debug flag. + __GG_DEBUG = debug; + + void* data = ::shmat(shmId, (void *) 0, 0); + + GG_LOG_DEBUG("Attaching to shmId: %d", shmId); + + if ((ptrdiff_t) data == -1) { + // Exception will be thrown on return. + throwExceptionByErrno(env); + + return 0; + } + + T_IpcData *ipcData = (T_IpcData*) data; + + return (jlong) (((char*) data) + BUF_OFFSET); +} + +/** + * Shuts down inter-process communication (JNI method). + * + * @param env JNI environment. + * @param buf Data buffer pointer in shared memory segment. + */ +void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_ipcClose(JNIEnv *env, jclass, jlong buf) { + T_IpcData *ipcData = (T_IpcData*) (((char *) buf) - BUF_OFFSET); + + // Set closed flag to true using memory barrier. + // This is to ensure the flag is set BEFORE we notify + // the semaphores (i.e. no reordering will happen). + __sync_fetch_and_add(&ipcData->closed, 1); + + // Remove semaphore. + if (::semctl(ipcData->semId, 0, IPC_RMID) == -1) { + if (__GG_DEBUG) + cerr << "Failed to remove semaphore: " << errno << ": " << strerror(errno) << endl << flush; + + if (errno == EPERM) { // Operation not permitted (no rights). + // Signal both reader and writer (because we don't know who we are). + // The other side will remove the semaphore. + semNotify(env, ipcData->semId, SEM_READ, ipcData); + semNotify(env, ipcData->semId, SEM_WRITE, ipcData); + } + } +} + +/** + * Detaches from shared memory segment and removes the token file. + * + * @param env JNI environment. + * @param jTokFileName Token file name for allocating resources. + * @param buf Data buffer pointer in shared memory segment. + * @param force Force flag for forcing resources removal. + * + */ +void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_freeSystemResources__Ljava_lang_String_2JZ( + JNIEnv* env, jclass, jstring jTokFileName, jlong buf, jboolean force) { + T_IpcData *ipcData = (T_IpcData*) (((char *) buf) - BUF_OFFSET); + + if (__sync_bool_compare_and_swap(&ipcData->closedCnt, 0, 1) && !force) { + ::shmdt(ipcData); + + return; + } + +#ifdef __APPLE__ + int shmId = ipcData->shmId; +#endif + + // Detach from shared memory (shared memory segment will be deleted upon last detach). + if (::shmdt(ipcData) == -1) { + // If error occurred, then return. + return; + } + +#ifdef __APPLE__ + GG_LOG_DEBUG("Deleting shared memory region: %d", shmId); + + ::shmctl(shmId, IPC_RMID, NULL); +#endif + + jboolean isCopy = false; + + // Copy to STL string and release. + const char* tokFileName0 = env->GetStringUTFChars(jTokFileName, &isCopy); + + string tokFileName(tokFileName0); + + env->ReleaseStringUTFChars(jTokFileName, tokFileName0); + + ::remove(tokFileName.c_str()); +} + +/** + * Removes semaphores and shared memory segment. + * + * @param env JNI environment. + * @param jTokFileName Token file name for allocating resources. + * @param size Shared memory segment size in bytes. + */ +void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_freeSystemResources__Ljava_lang_String_2I( + JNIEnv* env, jclass, jstring jTokFileName, jint size) { + int key = -1; + int shmId = -1; + int semId = -1; + jboolean isCopy = false; + + // Copy to STL string and release. + const char* tokFileName0 = env->GetStringUTFChars(jTokFileName, &isCopy); + + string tokFileName(tokFileName0); + + env->ReleaseStringUTFChars(jTokFileName, tokFileName0); + + // Get system token, used in IPC. + if ((key = ::ftok(tokFileName.c_str(), 'G')) == -1) { + return; + } + + // Get semaphores for native synchronization (no create). + if ((semId = ::semget(key, 2, 0666)) > 0) { + // Remove semaphores if present. + ::semctl((int) semId, 0, IPC_RMID); + } + + // Get shared memory descriptor (no create). + if ((shmId = ::shmget(key, size, 0666)) == -1) { + // This means that shared memory segment was not created or was removed. + // No point to continue, as semaphores do not exist as well. + return; + } + + // Remove shared memory segment (ignoring possible errors). + ::shmctl((int) shmId, IPC_RMID, NULL); +} + +/** + * Read-write operations for shared memory segments. + */ +class RW { +public: + /** + * Copies data from shared memory to the destination buffer. + * + * @param env JNI environment. + * @param dest Destination buffer. + * @param dOffset Destination buffer offset. + * @param len Number of bytes to copy. + * @param src Source pointer in shared memory segment. + */ + static void FromShMem(JNIEnv *env, jbyteArray dest, jlong dOffset, jlong len, void *src) { + env->SetByteArrayRegion(dest, dOffset, len, (jbyte*) src); + } + + /** + * Copies data from shared memory to the destination Java object. + * + * @param env JNI environment. + * @param dest Destination Java object. + * @param dOffset Destination object offset. + * @param len Number of bytes to copy. + * @param src Source pointer in shared memory segment. + */ + static void FromShMem(JNIEnv *env, jobject dest, jlong dOffset, jlong len, void *src) { + char *destAddr = ((char *) env->GetDirectBufferAddress(dest)) + dOffset; + memcpy((void*) destAddr, src, len); + } + + /** + * Copies data from buffer to shared memory. + * + * @param env JNI environment. + * @param src Source buffer. + * @param sOffset Source buffer offset. + * @param len Number of bytes to copy. + * @param dest Destination pointer. + */ + static void ToShMem(JNIEnv *env, jbyteArray src, jlong sOffset, jlong len, void *dest) { + env->GetByteArrayRegion(src, sOffset, len, (jbyte*) dest); + } + + /** + * Copies data from Java object to shared memory. + * + * @param env JNI environment. + * @param src Source Java object. + * @param sOffset Source Java object offset. + * @param len Number of bytes to copy. + * @param dest Destination pointer. + */ + static void ToShMem(JNIEnv *env, jobject src, jlong sOffset, jlong len, void *dest) { + char *srcAddr = ((char *) env->GetDirectBufferAddress(src)) + sOffset; + memcpy(dest, (void*) srcAddr, len); + } +}; + +/** + * Helper method for copying data from shared memory. + * + * @param env JNI environment. + * @param shMemPtr Data pointer in shared memory segment. + * @param dest Destination object to copy data to. + * @param dOffset Destination object write offset. + * @param len Number of bytes to copy. + * @param timeout Operation timeout in milliseconds. + * @param <T> Destination object type. + */ +template<class T> +jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_ReadShMem(JNIEnv *env, jclass, jlong shMemPtr, + T dest, jlong dOffset, jlong len, jlong timeout) { + T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET); + + unsigned int unreadCnt = getUnreadCount(ipcData, true, false); + + while (unreadCnt == 0) { + if (unreadCnt == 0 && ipcData->closed) { + return -1; + } + + // signal the other party, if it's blocked + if (ipcData->writeBlocked) { + GG_LOG_DEBUG("Before write semaphore notification [semId=%d]", ipcData->semId); + + semNotify(env, ipcData->semId, SEM_WRITE, ipcData); + } + + GG_LOG_DEBUG("Before read semaphore wait [semId=%d]", ipcData->semId); + + ipcData->readBlocked = 1; + semWait(env, ipcData->semId, SEM_READ, timeout, ipcData); + ipcData->readBlocked = 0; + + unreadCnt = getUnreadCount(ipcData, true, false); + } + + int bytesRead = 0; + + while (unreadCnt > 0 && bytesRead < len) { + int pos = ipcData->readCnt % ipcData->size; + int len0 = (ipcData->size - pos < unreadCnt) ? ipcData->size - pos : unreadCnt; + + if (len0 > len - bytesRead) { + len0 = len - bytesRead; + } + + RW::FromShMem(env, dest, dOffset + bytesRead, len0, (void*) (shMemPtr + pos)); + + __sync_add_and_fetch(&ipcData->readCnt, len0); + + GG_LOG_DEBUG("Updated read count [readCnt=%d]", ipcData->readCnt); + + bytesRead += len0; + + GG_LOG_DEBUG("Before write semaphore notification [semId=%d]", ipcData->semId); + + semNotify(env, ipcData->semId, SEM_WRITE, ipcData); + + unreadCnt = getUnreadCount(ipcData, true, false); + } + + return bytesRead; +} + +/** + * Helper method for copying data to shared memory. + * + * @param env JNI environment. + * @param shMemPtr Data pointer in shared memory segment. + * @param src Source object to copy data from. + * @param dOffset Destination object read offset. + * @param len Number of bytes to copy. + * @param timeout Operation timeout in milliseconds. + * @param <T> Source object type. + */ +template<class T> +void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_WriteShMem(JNIEnv *env, jclass, + jlong shMemPtr, T src, jlong sOffset, jlong len, jlong timeout) { + T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET); + + int bytesWritten = 0; + + while (bytesWritten < len) { + // Wait for reader. + unsigned int unreadCnt = getUnreadCount(ipcData, false, true); + int pos = ipcData->writeCnt % ipcData->size; + + while (unreadCnt == ipcData->size) { + if (ipcData->closed) { + env->ThrowNew(env->FindClass(GRID_EXCEPTION), "Shared memory segment has been closed."); + + return; + } + + // signal the other party, if it's blocked + if (ipcData->readBlocked) { + semNotify(env, ipcData->semId, SEM_READ, ipcData); + } + + ipcData->writeBlocked = 1; + semWait(env, ipcData->semId, SEM_WRITE, timeout, ipcData); + ipcData->writeBlocked = 0; + + unreadCnt = getUnreadCount(ipcData, false, true); + } + + int len0 = ipcData->size - ((pos > unreadCnt) ? pos : unreadCnt); + + if (len0 > len - bytesWritten) { + len0 = len - bytesWritten; + } + + if (ipcData->closed) { + env->ThrowNew(env->FindClass(GRID_EXCEPTION), "Shared memory segment has been closed"); + + return; + } + + RW::ToShMem(env, src, sOffset + bytesWritten, len0, (void*) (shMemPtr + pos)); + + __sync_add_and_fetch(&ipcData->writeCnt, len0); + + GG_LOG_DEBUG("Updated write count [readCnt=%d]", ipcData->readCnt); + + bytesWritten += len0; + + semNotify(env, ipcData->semId, SEM_READ, ipcData); + } +} + +/** + * Copies data from Java byte array to shared memory (JNI method). + * + * @param env JNI environment. + * @param clsName Java class name. + * @param shMemPtr Data pointer in shared memory segment. + * @param src Source Java byte array. + * @param dOffset Source Java byte array offset. + * @param len Number of bytes to copy. + * @param timeout Operation timeout in milliseconds. + */ +void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_writeSharedMemory(JNIEnv *env, jclass clsName, + jlong shMemPtr, jbyteArray src, jlong sOffset, jlong len, jlong timeout) { + Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_WriteShMem<jbyteArray>(env, clsName, shMemPtr, + src, sOffset, len, timeout); +} + +/** + * Copies data from Java byte array to shared memory (JNI method). + * + * @param env JNI environment. + * @param clsName Java class name. + * @param shMemPtr Data pointer in shared memory segment. + * @param src Source Java object. + * @param dOffset Source Java object offset. + * @param len Number of bytes to copy. + * @param timeout Operation timeout in milliseconds. + */ +void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_writeSharedMemoryByteBuffer(JNIEnv *env, + jclass clsName, jlong shMemPtr, jobject src, jlong sOffset, jlong len, jlong timeout) { + Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_WriteShMem<jobject>(env, clsName, shMemPtr, + src, sOffset, len, timeout); +} + +/** + * Copies data from shared memory to Java byte array (JNI method). + * + * @param env JNI environment. + * @param clsName Java class name. + * @param shMemPtr Data pointer in shared memory segment. + * @param dest Destination Java byte array. + * @param dOffset Destination Java byte array offset. + * @param size Number of bytes to copy. + * @param timeout Operation timeout in milliseconds. + */ +jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_readSharedMemory(JNIEnv *env, jclass clsName, + jlong shMemPtr, jbyteArray dest, jlong dOffset, jlong size, jlong timeout) { + return Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_ReadShMem<jbyteArray>(env, clsName, shMemPtr, + dest, dOffset, size, timeout); +} + +/** + * Copies data from shared memory to Java object (JNI method). + * + * @param env JNI environment. + * @param clsName Java class name. + * @param shMemPtr Data pointer in shared memory segment. + * @param dest Destination Java object. + * @param dOffset Destination Java object offset. + * @param size Number of bytes to copy. + * @param timeout Operation timeout in milliseconds. + */ +jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_readSharedMemoryByteBuffer(JNIEnv *env, + jclass clsName, jlong shMemPtr, jobject dest, jlong dOffset, jlong size, jlong timeout) { + return Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_ReadShMem<jobject>(env, clsName, shMemPtr, + dest, dOffset, size, timeout); +} + +/** + * Gets the number of unread bytes in shared memory segment (JNI method). + * + * @param shMemPtr Data pointer in shared memory segment. + * @return Number of uneread bytes. + */ +jint Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_unreadCount(JNIEnv*, jclass, jlong shMemPtr) { + T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET); + + return getUnreadCount(ipcData, true, true); +} + +/** + * Checks if the counterpart is alive (JNI method). + * + * @param pid Process ID of the counterpart. + * @return true if couterpart is alive, false otherwise. + */ +jboolean Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_alive(JNIEnv*, jclass, jint pid) { + int res = kill((int) pid, 0); + + // Return true if signal was sent or there is no permission to send signal to this process. + // If kill failed with error and errno is ESRCH or EINVAL, process is considered to be dead. + return res == 0 || errno == EPERM; +} + +/** + * Gets the shared memory segment ID for a given shared memory data pointer (JNI method). + * + * @param shMemPtr Shared memory data pointer. + * @return Shared memory ID. + */ +jint Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_sharedMemoryId(JNIEnv*, jclass, jlong shMemPtr) { + T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET); + + return ipcData->shmId; +} + +/** + * Gets the semaphore set ID for a given shared memory data pointer (JNI method). + * + * @param shMemPtr Shared memory data pointer. + * @return Semaphore set ID. + */ +jint Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_semaphoreId(JNIEnv*, jclass, jlong shMemPtr) { + T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET); + + return ipcData->semId; +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/ipc/shmem/readme.txt ---------------------------------------------------------------------- diff --git a/ipc/shmem/readme.txt b/ipc/shmem/readme.txt index ebc64c8..59e3b4c 100644 --- a/ipc/shmem/readme.txt +++ b/ipc/shmem/readme.txt @@ -26,7 +26,7 @@ Usage with GridGain ------------------- Copy compiled library to folder that already listed in 'java.library.path' -with name in form: 'libggshmem-<gridgain-version>.<extention>'. +with name in form: 'libigniteshmem-<gridgain-version>.<extention>'. Note: Grid should be restarted. ************************************************************************************** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so b/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so index d39a83e..7dd28f0 100755 Binary files a/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so and b/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so differ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java index ef310ec..4ad95d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java @@ -36,10 +36,10 @@ public class GridIpcSharedMemoryNativeLoader { private static volatile boolean loaded; /** Library name base. */ - private static final String LIB_NAME_BASE = "ggshmem"; + private static final String LIB_NAME_BASE = "igniteshmem"; /** Lock file path. */ - private static final File LOCK_FILE = new File(System.getProperty("java.io.tmpdir"), "ggshmem.lock"); + private static final File LOCK_FILE = new File(System.getProperty("java.io.tmpdir"), "igniteshmem.lock"); /** Library name. */ static final String LIB_NAME = LIB_NAME_BASE + "-" + GridProductImpl.VER; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java index 8fb7c6a..ec0566e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java @@ -185,7 +185,7 @@ public class IpcSharedMemoryUtils { * @return Wrapping grid exception. */ static IgniteCheckedException linkError(UnsatisfiedLinkError e) { - return new IgniteCheckedException("Linkage error due to possible native library, libggshmem.so, " + + return new IgniteCheckedException("Linkage error due to possible native library, libigniteshmem.so, " + "version mismatch (stop all grid nodes, clean up your '/tmp' folder, and try again).", e); }