# ignite-496
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5fa19eb6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5fa19eb6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5fa19eb6 Branch: refs/heads/ignite-496 Commit: 5fa19eb6c3227f72eec33c36904ddf1b76f5fca8 Parents: 97f0c03 Author: sboikov <sboi...@gridgain.com> Authored: Thu Mar 19 17:56:48 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Mar 19 17:56:48 2015 +0300 ---------------------------------------------------------------------- .../processors/interop/InteropCache.java | 16 +++ .../processors/interop/InteropTarget.java | 4 +- .../interop/InteropTargetAdapter.java | 50 ++++++++- .../processors/interop/InteropUtils.java | 4 +- .../ignite-interop-api/ignite-interop-api.cpp | 44 ++++++-- .../ignite-interop-api/ignite-interop-api.h | 8 ++ .../src/ignite_cache.cpp | 102 ++++++++++++++++++- 7 files changed, 213 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java index 6b2e8e0..d17d7b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.interop; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.interop.*; @@ -67,4 +68,19 @@ public class InteropCache extends InteropTargetAdapter { marsh.writeObject(val, out); } + + /** {@inheritDoc} */ + @Override protected IgniteInternalFuture<Void> inOutAsyncOp(int type, + InteropInputStream in, + InteropOutputStream out, + InteropMarshaller marsh) + { + Object key = marsh.readObject(in); + + Object val = marsh.readObject(in); + + log.info("Interop put async [key=" + key + ", val=" + val + ']'); + + return cache.putxAsync(key, val); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java index bbb99bc..28a4794 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java @@ -46,9 +46,9 @@ public interface InteropTarget { * @param ptr Input data pointer. * @param len Input data length. * @param cb Callback address. - * @param cbArg Value passed to callback. + * @param cbData Value passed to callback. * @throws IgniteCheckedException If failed. */ - public void inOutAsyncOp(int type, long ptr, int len, long cb, long cbArg) + public void inOutOpAsync(int type, long ptr, int len, long cb, long cbData) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java index 58f5f3e..7b29301 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.interop; import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.interop.*; /** @@ -87,6 +89,52 @@ public abstract class InteropTargetAdapter implements InteropTarget { throws IgniteCheckedException; /** {@inheritDoc} */ - @Override public void inOutAsyncOp(int type, long ptr, int len, long cb, long cbArg) throws IgniteCheckedException { + @Override public void inOutOpAsync(int type, + long ptr, + int len, + final long cb, + final long cbData) + throws IgniteCheckedException + { + InteropOffheapInputStream in = new InteropOffheapInputStream(ptr, len); + + InteropMarshaller marsh = proc.marshaller(); + + InteropOffheapOutputStream out = new InteropOffheapOutputStream(1024); + + IgniteInternalFuture<Void> fut = inOutAsyncOp(type, in, out, marsh); + + fut.listen(new CI1<IgniteInternalFuture>() { + @Override public void apply(final IgniteInternalFuture fut) { + proc.context().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + Thread.sleep(500); + + fut.get(); + + InteropUtils.asyncCallback(cb, cbData, 1, 0); + } + catch (Throwable e) { + e.printStackTrace(); + + InteropUtils.asyncCallback(cb, cbData, -1, 0); + } + } + }); + } + }); } + + /** + * @param type Type. + * @param in Input. + * @param out Output. + * @param marsh Marshaller. + * @return Future. + */ + protected abstract IgniteInternalFuture<Void> inOutAsyncOp(int type, + InteropInputStream in, + InteropOutputStream out, + InteropMarshaller marsh); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java index 73987e3..ed3c521 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java @@ -37,6 +37,8 @@ public class InteropUtils { /** * @param cb Callback address. * @param cbArg Value passed to callback. + * @param resType Result type. + * @param res Result. */ - native public void asyncCallback(long cb, long cbArg); + native public static void asyncCallback(long cb, long cbArg, int resType, long res); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp ---------------------------------------------------------------------- diff --git a/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp b/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp index a2db741..a28cf2f 100644 --- a/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp +++ b/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp @@ -61,6 +61,7 @@ JniMethod M_IGNITION_EX_START_WITH_CLO = JniMethod("startWithClosure", "(Ljava/l const char* C_INTEROP_UTILS = "org/apache/ignite/internal/processors/interop/InteropUtils"; JniMethod M_INTEROP_UTILS_INTEROP = JniMethod("interop", "(Lorg/apache/ignite/Ignite;)Lorg/apache/ignite/internal/processors/interop/InteropProcessor;", true); +JniMethod M_INTEROP_UTILS_ASYNC_CALLBACK = JniMethod("asyncCallback", "(JJIJ)V", true); const char* C_INTEROP_PROCESSOR = "org/apache/ignite/internal/processors/interop/InteropProcessor"; JniMethod M_INTEROP_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/interop/InteropTarget;", false); @@ -68,6 +69,7 @@ JniMethod M_INTEROP_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lo const char* C_INTEROP_TARGET = "org/apache/ignite/internal/processors/interop/InteropTargetAdapter"; JniMethod M_INTEROP_TARGET_IN_OP = JniMethod("inOp", "(IJI)I", false); JniMethod M_INTEROP_TARGET_IN_OUT_OP = JniMethod("inOutOp", "(IJI)J", false); +JniMethod M_INTEROP_TARGET_IN_OUT_OP_ASYNC = JniMethod("inOutOpAsync", "(IJIJJ)V", false); const char* C_INTEROP_CACHE = "org/apache/ignite/internal/processors/interop/InteropCache"; @@ -99,6 +101,12 @@ jmethodID FindMethod(JNIEnv* env, jclass cls, JniMethod mthd) { return res; } +JNIEXPORT void JNICALL JniAsyncCallback(JNIEnv *env, jclass cls, jlong cb, jlong cbData, jint resType, jlong res) { + IgniteAsyncCallback cbPtr = (IgniteAsyncCallback)cb; + cbPtr((void*)cbData, resType, (void*)res); +} + + /* Internal context initialization routine. */ int ContextInit0(JavaVMInitArgs args, JavaVM** retJvm, JNIEnv** retEnv) { // 1. Check if another JVM is already started. @@ -186,6 +194,11 @@ int ContextInit0(JavaVMInitArgs args, JavaVM** retJvm, JNIEnv** retEnv) { if (!ctx->m_InteropUtils_interop) return JNI_ERR; + ctx->m_InteropUtils_asyncCallback = FindMethod(env, ctx->c_InteropUtils, M_INTEROP_UTILS_ASYNC_CALLBACK); + + if (!ctx->m_InteropUtils_asyncCallback) + return JNI_ERR; + ctx->c_InteropTarget = FindClass(env, C_INTEROP_TARGET); if (!ctx->c_InteropTarget) @@ -201,6 +214,11 @@ int ContextInit0(JavaVMInitArgs args, JavaVM** retJvm, JNIEnv** retEnv) { if (!ctx->m_InteropTarget_inOutOp) return JNI_ERR; + ctx->m_InteropTarget_inOutOpAsync = FindMethod(env, ctx->c_InteropTarget, M_INTEROP_TARGET_IN_OUT_OP_ASYNC); + + if (!ctx->m_InteropTarget_inOutOpAsync) + return JNI_ERR; + ctx->c_InteropCache = FindClass(env, C_INTEROP_CACHE); if (!ctx->c_InteropCache) @@ -208,22 +226,20 @@ int ContextInit0(JavaVMInitArgs args, JavaVM** retJvm, JNIEnv** retEnv) { // 4. Register natives. - /* { - JNINativeMethod methods[22]; + JNINativeMethod methods[1]; int idx = 0; - methods[idx].name = (char*)M_GRID_INTEROP_UTILS_ON_START.name; - methods[idx].signature = (char*)M_GRID_INTEROP_UTILS_ON_START.sign; - methods[idx++].fnPtr = JniOnStart; + methods[idx].name = (char*)M_INTEROP_UTILS_ASYNC_CALLBACK.name; + methods[idx].signature = (char*)M_INTEROP_UTILS_ASYNC_CALLBACK.sign; + methods[idx++].fnPtr = JniAsyncCallback; - res = env->RegisterNatives(ctx->c_GridInteropUtils, methods, idx); + res = env->RegisterNatives(ctx->c_InteropUtils, methods, idx); if (res != JNI_OK) return res; } - */ // JNI Env is only necessary for error handling, so we nullify to keep invariant. *retEnv = NULL; @@ -471,6 +487,16 @@ void* IgniteInteropAbstractTarget::inOutOp(jint type, void* ptr, jint len) { return (void*)res; } +void IgniteInteropAbstractTarget::inOpAsync(jint type, void* ptr, jint len, IgniteAsyncCallback cb, void* data) { + JNIEnv* env = Attach(); + + env->CallNonvirtualVoidMethod(this->obj, this->cls, Context()->m_InteropTarget_inOutOpAsync, type, ptr, len, cb, data); + + if (env->ExceptionCheck()) { + printError(env); + } +} + IGNITE_API_IMPORT_EXPORT IgniteInteropNode* StartNode() { JNIEnv* env = Attach(); @@ -546,6 +572,10 @@ void IgniteInteropCache::put(void* ptr, jint len) { this->inOp(0, ptr, len); } +void IgniteInteropCache::putAsync(void* ptr, jint len, IgniteAsyncCallback cb, void* data) { + this->inOpAsync(0, ptr, len, cb, data); +} + void* IgniteInteropCache::get(void* ptr, jint len) { return this->inOutOp(0, ptr, len); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h ---------------------------------------------------------------------- diff --git a/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h b/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h index 3849a30..607ab16 100644 --- a/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h +++ b/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h @@ -54,6 +54,7 @@ struct JniContext { jclass c_InteropUtils; jmethodID m_InteropUtils_interop; + jmethodID m_InteropUtils_asyncCallback; jclass c_InteropProcessor; jmethodID m_InteropProcessor_cache; @@ -61,6 +62,7 @@ struct JniContext { jclass c_InteropTarget; jmethodID m_InteropTarget_inOp; jmethodID m_InteropTarget_inOutOp; + jmethodID m_InteropTarget_inOutOpAsync; jclass c_InteropCache; }; @@ -291,6 +293,8 @@ private: InteropByteBuffer bytes; }; +typedef void(*IgniteAsyncCallback)(void* data, int resType, void* res); + class IGNITE_API_IMPORT_EXPORT IgniteInteropAbstractTarget { protected: /** Target class for non-virtual invokes. */ @@ -309,6 +313,8 @@ public: jint inOp(jint type, void* ptr, jint len); void* inOutOp(jint type, void* ptr, jint len); + + void inOpAsync(jint type, void* ptr, jint len, IgniteAsyncCallback cb, void* data); }; class IGNITE_API_IMPORT_EXPORT IgniteInteropCache : public IgniteInteropAbstractTarget { @@ -317,6 +323,8 @@ public: void put(void* ptr, jint len); + void putAsync(void* ptr, jint len, IgniteAsyncCallback cb, void* data); + void* get(void* ptr, jint len); }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp ---------------------------------------------------------------------- diff --git a/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp b/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp index 32a6f6d..c8b20a2 100644 --- a/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp +++ b/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp @@ -145,11 +145,15 @@ void AfterQueueWork(uv_work_t* req) } void freeCallback(uv_async_t *async) { + std::cout << "NodeJs free async handle"; + delete async; } void AfterExecute(uv_async_t *async) { + std::cout << "NodeJs after execute"; + Isolate* isolate = Isolate::GetCurrent(); HandleScope scope(isolate); @@ -177,8 +181,10 @@ void AfterExecute(uv_async_t *async) argv); } -void igniteAsyncCallback(AsyncData* asyncData) +void igniteAsyncCallback(AsyncData* asyncData, int resType, void* res) { + std::cout << "NodeJs async callback, resType=" << resType << "\n"; + uv_async_send(asyncData->async); } @@ -201,7 +207,8 @@ void IgniteCache::PutAsync(const FunctionCallbackInfo<Value>& args) { AsyncData* asyncData = new AsyncData(); - asyncData->cb = Persistent<Function>(isolate, cb); + // asyncData = Persistent<Function>(isolate, cb); + asyncData->cb.Reset(isolate, cb); std::cout << "Put async key=" << key << ", val=" << val << "\n"; @@ -218,7 +225,7 @@ void IgniteCache::PutAsync(const FunctionCallbackInfo<Value>& args) { uv_async_init(uv_default_loop(), async, AfterExecute); - cache->cache->put(out.data(), out.size()); + cache->cache->putAsync(out.data(), out.size(), (IgniteAsyncCallback)igniteAsyncCallback, asyncData); // uv_queue_work(uv_default_loop(), 0, QueueWorkNoop, (uv_after_work_cb)AfterQueueWork); @@ -226,6 +233,32 @@ void IgniteCache::PutAsync(const FunctionCallbackInfo<Value>& args) { //args.GetReturnValue().Set(Number::New(isolate, obj->value_)); } +void printPropertis(Local<Value> prop) { + if (prop->IsObject()) { + Local<Object> obj = Local<Object>::Cast(prop); + + std::cout << "Constructor: " << *v8::String::Utf8Value(obj->GetConstructorName()) << "\n"; + + Local<Array> props = obj->GetPropertyNames(); + + std::cout << "Properties (" << props->Length() << "):\n"; + + for (int i = 0; i < props->Length(); i++) { + Local<Value> prop = props->Get(i); + + Local<Value> val = obj->Get(prop); + + if (!val->IsFunction()) + std::cout << *v8::String::Utf8Value(prop->ToString()) << "=" << *v8::String::Utf8Value(val->ToString()) << "\n"; + + /* + std::string name(*v8::String::Utf8Value(prop->ToString())); + + if (name != "global" && name != "EventEmitter") + printPropertis(val);*/ + } + } +} void IgniteCache::ObjectInfo(const FunctionCallbackInfo<Value>& args) { Isolate* isolate = Isolate::GetCurrent(); HandleScope scope(isolate); @@ -255,4 +288,65 @@ void IgniteCache::ObjectInfo(const FunctionCallbackInfo<Value>& args) { std::cout << *v8::String::Utf8Value(prop->ToString()) << "=" << *v8::String::Utf8Value(val->ToString()) << "\n"; } } -} \ No newline at end of file + + Handle<v8::Object> global = isolate->GetCurrentContext()->Global(); + + Local<Object> process = Local<Object>::Cast(global->Get(String::NewFromUtf8(isolate, "process"))); + + printPropertis(process); + /* + Handle<v8::Object> global = isolate->GetCurrentContext()->Global(); + + Local<Object> process = Local<Object>::Cast(global->Get(String::NewFromUtf8(isolate, "process"))); + + Local<Object> module = Local<Object>::Cast(process->Get(String::NewFromUtf8(isolate, "mainModule"))); + + Local<Object> children = Local<Object>::Cast(module->Get(String::NewFromUtf8(isolate, "children"))); + + printPropertis(children); + */ + /* + Local<Object> obj = Local<Object>::Cast(args[0]); + + std::cout << "Constructor: " << *v8::String::Utf8Value(obj->GetConstructorName()) << "\n"; + + Local<Array> props = obj->GetPropertyNames(); + + std::cout << "Properties (" << props->Length() << "):\n"; + + for (int i = 0; i < props->Length(); i++) { + Local<Value> prop = props->Get(i); + + Local<Value> val = obj->Get(prop); + + if (!val->IsFunction()) { + std::cout << *v8::String::Utf8Value(prop->ToString()) << "=" << *v8::String::Utf8Value(val->ToString()) << "\n"; + } + } + + Local<v8::Context> context = obj->CreationContext(); + + Handle<v8::Object> global = context->Global(); + + props = global->GetPropertyNames(); + + std::cout << "Global properties (" << props->Length() << "):\n"; + + for (int i = 0; i < props->Length(); i++) { + Local<Value> prop = props->Get(i); + + Local<Value> val = obj->Get(prop); + + //if (!val->IsFunction()) { + std::cout << *v8::String::Utf8Value(prop->ToString()) << "=" << *v8::String::Utf8Value(val->ToString()) << "\n"; + //} + } + + Handle<v8::Value> value = global->Get(String::NewFromUtf8(isolate, "Apple")); + + if (value->IsFunction()) { + std::cout << "Found function!"; + } + else + std::cout << "Function not found!";*/ +}