This is an automated email from the ASF dual-hosted git repository.

moon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 154ee11  [ZEPPELIN-4004] Fix RemoteResource.invokeMethod()
154ee11 is described below

commit 154ee11842a9ff874a132ab2e80f6c06dfffa190
Author: Lee moon soo <m...@apache.org>
AuthorDate: Mon Feb 18 16:31:00 2019 -0800

    [ZEPPELIN-4004] Fix RemoteResource.invokeMethod()
    
    ### What is this PR for?
    RemoteResource is a representation of object in ResourcePool in Interpreter 
running on another process.
    
    RemoteResource provides a invokeMethod() to call method of an object 
remotely, which is not working now. This PR provide a fix for that. Also 
improve the test case to cover this case.
    
    Also provide small improvement to inference parameter type on remote 
invocation.
    
    ### What type of PR is it?
    Bug Fix, Improvement
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-4004
    
    ### How should this be tested?
    Unittest included. CI pass.
    
    ### Questions:
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: Lee moon soo <m...@apache.org>
    
    Closes #3312 from Leemoonsoo/ZEPPELIN-4004 and squashes the following 
commits:
    
    c44dcc073 [Lee moon soo] address reference to invokeMethod is ambiguous
    1893ad150 [Lee moon soo] inference parameter type
    fe6f5ed83 [Lee moon soo] Fix RemoteResource.invokeMethod() and update 
testcase to cover it
---
 .../remote/RemoteInterpreterEventClient.java       |  98 +++----
 .../remote/RemoteInterpreterServer.java            |  38 +--
 .../interpreter/util/ByteBufferUtils.java}         |  28 +-
 .../apache/zeppelin/resource/RemoteResource.java   |   5 +-
 .../org/apache/zeppelin/resource/Resource.java     | 313 ++++++++++++++++++++-
 .../org/apache/zeppelin/resource/ResourceId.java   |   3 +-
 .../zeppelin/tabledata/ProxyRowIterator.java       |   4 +-
 .../apache/zeppelin/tabledata/TableDataProxy.java  |   4 +-
 .../interpreter/util/ByteBufferUtilTest.java       |  17 ++
 .../org/apache/zeppelin/resource/ResourceTest.java |  36 +++
 .../interpreter/RemoteInterpreterEventServer.java  |  15 +-
 .../remote/mock/MockInterpreterResourcePool.java   |   4 +-
 .../resource/DistributedResourcePoolTest.java      |   5 +-
 13 files changed, 446 insertions(+), 124 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index b7e77c6..5ac1c0a 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -124,39 +124,20 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
       Object[] params) {
     LOGGER.debug("Request Invoke method {} of Resource {}", methodName, 
resourceId.getName());
 
-    return null;
-    //    InvokeResourceMethodEventMessage invokeMethod = new 
InvokeResourceMethodEventMessage(
-    //        resourceId,
-    //        methodName,
-    //        paramTypes,
-    //        params,
-    //        null);
-    //
-    //    synchronized (getInvokeResponse) {
-    //      // wait for previous response consumed
-    //      while (getInvokeResponse.containsKey(invokeMethod)) {
-    //        try {
-    //          getInvokeResponse.wait();
-    //        } catch (InterruptedException e) {
-    //          LOGGER.warn(e.getMessage(), e);
-    //        }
-    //      }
-    //      // send request
-    //      sendEvent(new RemoteInterpreterEvent(
-    //          RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
-    //          invokeMethod.toJson()));
-    //      // wait for response
-    //      while (!getInvokeResponse.containsKey(invokeMethod)) {
-    //        try {
-    //          getInvokeResponse.wait();
-    //        } catch (InterruptedException e) {
-    //          LOGGER.warn(e.getMessage(), e);
-    //        }
-    //      }
-    //      Object o = getInvokeResponse.remove(invokeMethod);
-    //      getInvokeResponse.notifyAll();
-    //      return o;
-    //    }
+    InvokeResourceMethodEventMessage invokeMethod = new 
InvokeResourceMethodEventMessage(
+            resourceId,
+            methodName,
+            paramTypes,
+            params,
+            null);
+    try {
+      ByteBuffer buffer = intpEventServiceClient.invokeMethod(intpGroupId, 
invokeMethod.toJson());
+      Object o = Resource.deserializeObject(buffer);
+      return o;
+    } catch (TException | IOException | ClassNotFoundException e) {
+      LOGGER.error("Failed to invoke method", e);
+      return null;
+    }
   }
 
   /**
@@ -178,39 +159,24 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
       String returnResourceName) {
     LOGGER.debug("Request Invoke method {} of Resource {}", methodName, 
resourceId.getName());
 
-    return null;
-    //    InvokeResourceMethodEventMessage invokeMethod = new 
InvokeResourceMethodEventMessage(
-    //        resourceId,
-    //        methodName,
-    //        paramTypes,
-    //        params,
-    //        returnResourceName);
-    //
-    //    synchronized (getInvokeResponse) {
-    //      // wait for previous response consumed
-    //      while (getInvokeResponse.containsKey(invokeMethod)) {
-    //        try {
-    //          getInvokeResponse.wait();
-    //        } catch (InterruptedException e) {
-    //          LOGGER.warn(e.getMessage(), e);
-    //        }
-    //      }
-    //      // send request
-    //      sendEvent(new RemoteInterpreterEvent(
-    //          RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
-    //          invokeMethod.toJson()));
-    //      // wait for response
-    //      while (!getInvokeResponse.containsKey(invokeMethod)) {
-    //        try {
-    //          getInvokeResponse.wait();
-    //        } catch (InterruptedException e) {
-    //          LOGGER.warn(e.getMessage(), e);
-    //        }
-    //      }
-    //      Resource o = (Resource) getInvokeResponse.remove(invokeMethod);
-    //      getInvokeResponse.notifyAll();
-    //      return o;
-    //    }
+    InvokeResourceMethodEventMessage invokeMethod = new 
InvokeResourceMethodEventMessage(
+            resourceId,
+            methodName,
+            paramTypes,
+            params,
+            returnResourceName);
+
+    try {
+      ByteBuffer serializedResource = 
intpEventServiceClient.invokeMethod(intpGroupId, invokeMethod.toJson());
+      Resource deserializedResource = (Resource) 
Resource.deserializeObject(serializedResource);
+      RemoteResource remoteResource = 
RemoteResource.fromJson(gson.toJson(deserializedResource));
+      remoteResource.setResourcePoolConnector(this);
+
+      return remoteResource;
+    } catch (TException | IOException | ClassNotFoundException e) {
+      LOGGER.error("Failed to invoke method", e);
+      return null;
+    }
   }
 
   public synchronized void onInterpreterOutputAppend(
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index c50b8a4..08fd2f7 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -943,7 +943,6 @@ public class RemoteInterpreterServer extends Thread
     for (Resource r : resourceSet) {
       result.add(r.toJson());
     }
-
     return result;
   }
 
@@ -977,7 +976,6 @@ public class RemoteInterpreterServer extends Thread
       String noteId, String paragraphId, String resourceName, String 
invokeMessage) {
     InvokeResourceMethodEventMessage message =
         InvokeResourceMethodEventMessage.fromJson(invokeMessage);
-
     Resource resource = resourcePool.get(noteId, paragraphId, resourceName, 
false);
     if (resource == null || resource.get() == null) {
       return ByteBuffer.allocate(0);
@@ -991,13 +989,20 @@ public class RemoteInterpreterServer extends Thread
         if (message.shouldPutResultIntoResourcePool()) {
           // if return resource name is specified,
           // then put result into resource pool
-          // and return empty byte buffer
+          // and return the Resource class instead of actual return object.
           resourcePool.put(
               noteId,
               paragraphId,
               message.returnResourceName,
               ret);
-          return ByteBuffer.allocate(0);
+
+          Resource returnValResource = resourcePool.get(noteId, paragraphId, 
message.returnResourceName);
+          ByteBuffer serialized = Resource.serializeObject(returnValResource);
+          if (serialized == null) {
+            return ByteBuffer.allocate(0);
+          } else {
+            return serialized;
+          }
         } else {
           // if return resource name is not specified,
           // then return serialized result
@@ -1015,31 +1020,6 @@ public class RemoteInterpreterServer extends Thread
     }
   }
 
-  //  /**
-  //   * Get payload of resource from remote
-  //   *
-  //   * @param invokeResourceMethodEventMessage json serialized 
InvokeResourcemethodEventMessage
-  //   * @param object                           java serialized of the object
-  //   * @throws TException
-  //   */
-  //  @Override
-  //  public void resourceResponseInvokeMethod(
-  //      String invokeResourceMethodEventMessage, ByteBuffer object) throws 
TException {
-  //    InvokeResourceMethodEventMessage message =
-  //        
InvokeResourceMethodEventMessage.fromJson(invokeResourceMethodEventMessage);
-  //
-  //    if (message.shouldPutResultIntoResourcePool()) {
-  //      Resource resource = resourcePool.get(
-  //          message.resourceId.getNoteId(),
-  //          message.resourceId.getParagraphId(),
-  //          message.returnResourceName,
-  //          true);
-  //      eventClient.putResponseInvokeMethod(message, resource);
-  //    } else {
-  //      eventClient.putResponseInvokeMethod(message, object);
-  //    }
-  //  }
-
   @Override
   public void angularRegistryPush(String registryAsString) throws TException {
     try {
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ByteBufferUtils.java
similarity index 61%
copy from 
zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
copy to 
zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ByteBufferUtils.java
index fb8b271..aff7588 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ByteBufferUtils.java
@@ -14,22 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.resource;
+package org.apache.zeppelin.interpreter.util;
 
-import org.junit.Test;
-
-import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 
-import static org.junit.Assert.assertEquals;
+public class ByteBufferUtils {
+  public static ByteBuffer stringToByteBuffer(String msg, Charset charset){
+    return ByteBuffer.wrap(msg.getBytes(charset));
+  }
 
-/**
- * Test for Resource
- */
-public class ResourceTest {
-  @Test
-  public void testSerializeDeserialize() throws IOException, 
ClassNotFoundException {
-    ByteBuffer buffer = Resource.serializeObject("hello");
-    assertEquals("hello", Resource.deserializeObject(buffer));
+  public static String ByteBufferToString(ByteBuffer buffer, Charset charset){
+    byte[] bytes;
+    if(buffer.hasArray()) {
+      bytes = buffer.array();
+    } else {
+      bytes = new byte[buffer.remaining()];
+      buffer.get(bytes);
+    }
+    return new String(bytes, charset);
   }
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
index 874c1cb..19d84a0 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
@@ -17,15 +17,16 @@
 package org.apache.zeppelin.resource;
 
 import com.google.gson.Gson;
+import java.io.Serializable;
 import org.apache.zeppelin.common.JsonSerializable;
 
 /**
  * Resource that can retrieve data from remote
  */
-public class RemoteResource extends Resource implements JsonSerializable {
+public class RemoteResource extends Resource implements JsonSerializable, 
Serializable {
   private static final Gson gson = new Gson();
 
-  ResourcePoolConnector resourcePoolConnector;
+  transient ResourcePoolConnector resourcePoolConnector;
 
   RemoteResource(ResourceId resourceId, Object r) {
     super(null, resourceId, r);
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
index ec95ffb..c671707 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
@@ -17,6 +17,10 @@
 package org.apache.zeppelin.resource;
 
 import com.google.gson.Gson;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
 import org.apache.zeppelin.common.JsonSerializable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +37,7 @@ import java.nio.ByteBuffer;
 /**
  * Information and reference to the resource
  */
-public class Resource implements JsonSerializable {
+public class Resource implements JsonSerializable, Serializable {
   private static final Gson gson = new Gson();
 
   private final transient Object r;
@@ -111,6 +115,206 @@ public class Resource implements JsonSerializable {
     return true;
   }
 
+  /**
+   * Invoke a method without param
+   * @param methodName
+   * @return
+   */
+  public Object invokeMethod(String methodName) {
+    return invokeMethod(methodName, (Class []) null, (Object []) null);
+  }
+
+  /**
+   * Invoke a method and store result in ResourcePool
+   * @param methodName
+   * @param returnResourceName
+   * @return
+   */
+  public Resource invokeMethod(String methodName, String returnResourceName) {
+    return invokeMethod(methodName, (Class []) null, (Object []) null, 
returnResourceName);
+  }
+
+  /**
+   * Invoke a method with automatic parameter type inference
+   * @param methodName
+   * @param params
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Object invokeMethod(String methodName, Object [] params)
+          throws ClassNotFoundException {
+    return invokeMethod(methodName, (Type[]) null, params);
+  }
+
+  /**
+   * Invoke a method with automatic parameter type inference
+   * @param methodName
+   * @param params python interpreter convert python array '[]' to ArrayList 
through py4j
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Object invokeMethod(
+          String methodName, ArrayList params)
+          throws ClassNotFoundException {
+    Object[] paramsArray = params.toArray(new Object[]{});
+    return invokeMethod(methodName, paramsArray);
+  }
+
+  /**
+   * Invoke a method with automatic parameter type inference and store result 
in ResourcePool
+   * @param methodName
+   * @param params
+   * @param returnResourceName
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Resource invokeMethod(String methodName, Object [] params, String 
returnResourceName)
+          throws ClassNotFoundException {
+    return (Resource) invokeMethod(methodName, (Type[]) null, params, 
returnResourceName);
+  }
+
+  /**
+   * Invoke a method with automatic parameter type inference and store result 
in ResourcePool
+   * @param methodName
+   * @param params python interpreter convert python array '[]' to ArrayList 
through py4j
+   * @param returnResourceName
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Resource invokeMethod(
+          String methodName, ArrayList params, String returnResourceName)
+          throws ClassNotFoundException {
+    Object[] paramsArray = params.toArray(new Object[]{});
+    return invokeMethod(methodName, paramsArray, returnResourceName);
+  }
+
+  /**
+   * Invoke a method with given parameter class names
+   * @param methodName
+   * @param paramTypes list of fully qualified class name
+   * @param params
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Object invokeMethod(
+          String methodName, String[] paramTypes, Object[] params)
+          throws ClassNotFoundException {
+    Type [] types = typeFromName(paramTypes);
+    return invokeMethod(methodName, types, params);
+  }
+
+  /**
+   * Invoke a method with given parameter class names
+   * @param methodName
+   * @param paramTypes list of fully qualified class name. python interpreter 
convert python array '[]' to ArrayList through py4j
+   * @param params python interpreter convert python array '[]' to ArrayList 
through py4j
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Object invokeMethod(
+          String methodName, ArrayList<String> paramTypes, ArrayList params)
+          throws ClassNotFoundException {
+    String[] paramTypesArray = paramTypes.toArray(new String[]{});
+    Object[] paramsArray = params.toArray(new Object[]{});
+    return invokeMethod(methodName, paramTypesArray, paramsArray);
+  }
+
+  /**
+   * Invoke a method with given parameter class names and store result in 
ResourcePool
+   * @param methodName
+   * @param paramTypes
+   * @param params
+   * @param returnResourceName
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Resource invokeMethod(
+          String methodName, String[] paramTypes, Object[] params, String 
returnResourceName)
+          throws ClassNotFoundException {
+    Type [] types = typeFromName(paramTypes);
+    return (Resource) invokeMethod(methodName, types, params, 
returnResourceName);
+  }
+
+
+  public Resource invokeMethod(
+          String methodName, ArrayList<String> paramTypes, ArrayList params, 
String returnResourceName)
+          throws ClassNotFoundException {
+    String[] paramTypesArray = paramTypes.toArray(new String[]{});
+    Object[] paramsArray = params.toArray(new Object[]{});
+    return invokeMethod(methodName, paramTypesArray, paramsArray, 
returnResourceName);
+  }
+
+  /**
+   * Invoke a method with give parameter types
+   * @param methodName
+   * @param types
+   * @param params
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Object invokeMethod(
+          String methodName, Type[] types, Object[] params)
+          throws ClassNotFoundException {
+    return invokeMethod(methodName, types, params, null);
+  }
+
+  /**
+   * Invoke a method with given parameter type and store result in ResourcePool
+   * @param methodName
+   * @param types
+   * @param params
+   * @param returnResourceName
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Object invokeMethod(
+          String methodName, Type[] types, Object[] params, String 
returnResourceName) throws ClassNotFoundException {
+    Type[] methodTypes = null;
+    Object [] methodParams = null;
+    if (types != null) {
+      methodTypes = types;
+      methodParams = params;
+    } else {
+      // inference method param types
+      boolean found = false;
+      Method[] methods = r.getClass().getDeclaredMethods();
+      for (Method m : methods) {
+        if (!m.getName().equals(methodName)) {
+          continue;
+        }
+        Type[] paramTypes = m.getGenericParameterTypes();
+        Object[] paramValues = new Object[paramTypes.length];
+
+        int pidx = 0;
+        for (int i = 0; i < paramTypes.length; i++) {
+          if (pidx == params.length) {  // not enough param for this method 
signature
+            continue;
+          } else {
+            paramValues[i] = params[pidx++];
+          }
+        }
+
+        if (pidx == params.length) {  // param number does not match
+          found = true;
+          methodParams = paramValues;
+          methodTypes = paramTypes;
+          break;
+        }
+      }
+
+      if (!found) {
+        throw new ClassNotFoundException("No method found for given 
parameters");
+      }
+    }
+
+    Class[] classes = classFromType(methodTypes);
+
+    if (returnResourceName == null) {
+      return invokeMethod(methodName, classes, convertParams(methodTypes, 
methodParams));
+    } else {
+      return invokeMethod(methodName, classes, convertParams(methodTypes, 
methodParams), returnResourceName);
+    }
+  }
 
   /**
    * Call a method of the object that this resource holds
@@ -222,4 +426,111 @@ public class Resource implements JsonSerializable {
   public static Resource fromJson(String json) {
     return gson.fromJson(json, Resource.class);
   }
+
+  private ParameterizedType [] typeFromName(String [] classNames) throws 
ClassNotFoundException {
+    if (classNames == null) {
+      return null;
+    }
+    ParameterizedType[] types = new ParameterizedType[classNames.length];
+    for (int i = 0; i < classNames.length; i++) {
+      types[i] = typeFromName(classNames[i]);
+    }
+    return types;
+  }
+
+  private ParameterizedType typeFromName(String commaSeparatedClasses) throws 
ClassNotFoundException {
+    String[] classNames = commaSeparatedClasses.split(",");
+    Class [] arguments;
+
+    if (classNames.length > 1) {
+      arguments = new Class[classNames.length - 1];
+      for (int i = 1; i < classNames.length; i++) {
+        arguments[i - 1] = loadClass(classNames[i]);
+      }
+    } else {
+      arguments = new Class[0];
+    }
+
+    Class rawType = loadClass(classNames[0]);
+
+    return new ParameterizedType() {
+      @Override
+      public Type[] getActualTypeArguments() {
+        return arguments;
+      }
+
+      @Override
+      public Type getRawType() {
+        return rawType;
+      }
+
+      @Override
+      public Type getOwnerType() {
+        return null;
+      }
+    };
+  }
+
+  private Class [] classFromType(Type[] types) throws ClassNotFoundException {
+    Class[] cls = new Class[types.length];
+    for (int i = 0; i < types.length; i++) {
+      if (types[i] instanceof ParameterizedType) {
+        String typeName = ((ParameterizedType) 
types[i]).getRawType().getTypeName();
+        cls[i] = loadClass(typeName);
+      } else {
+        cls[i] = loadClass(types[i].getTypeName());
+      }
+    }
+    return cls;
+  }
+
+
+  private Object [] convertParams(Type[] types, Object [] params) {
+    Object [] converted = new Object[types.length];
+
+    for (int i = 0; i < types.length; i++) {
+      Type type = types[i];
+      String typeName;
+      if (type instanceof ParameterizedType) {
+        typeName = ((ParameterizedType) type).getRawType().getTypeName();
+      } else {
+        typeName = type.getTypeName();
+      }
+
+      Object param = params[i];
+      if (param == null) {
+        converted[i] = null;
+      } else if (param.getClass().getName().equals(typeName)) {
+        converted[i] = param;
+      } else {
+        // try to convert param
+        converted[i] = gson.fromJson(gson.toJson(param), type);
+      }
+    }
+
+    return converted;
+  }
+
+  private Class loadClass(String className) throws ClassNotFoundException {
+    switch(className) {
+      case "byte":
+        return byte.class;
+      case "short":
+        return short.class;
+      case "int":
+        return int.class;
+      case "long":
+        return long.class;
+      case "float":
+        return float.class;
+      case "double":
+        return double.class;
+      case "boolean":
+        return boolean.class;
+      case "char":
+        return char.class;
+      default:
+        return getClass().getClassLoader().loadClass(className);
+    }
+  }
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
index bef9e3f..ce06b73 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
@@ -17,12 +17,13 @@
 package org.apache.zeppelin.resource;
 
 import com.google.gson.Gson;
+import java.io.Serializable;
 import org.apache.zeppelin.common.JsonSerializable;
 
 /**
  * Identifying resource
  */
-public class ResourceId implements JsonSerializable {
+public class ResourceId implements JsonSerializable, Serializable {
   private static final Gson gson = new Gson();
 
   private final String resourcePoolId;
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/ProxyRowIterator.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/ProxyRowIterator.java
index 8a59098..ceb122c 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/ProxyRowIterator.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/ProxyRowIterator.java
@@ -33,13 +33,13 @@ public class ProxyRowIterator implements Iterator<Row> {
 
   @Override
   public boolean hasNext() {
-    rows.invokeMethod("hasNext", null, null);
+    rows.invokeMethod("hasNext");
     return false;
   }
 
   @Override
   public Row next() {
-    return (Row) rows.invokeMethod("next", null, null);
+    return (Row) rows.invokeMethod("next");
   }
 
   @Override
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
index 1926528..bb1f842 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
@@ -33,13 +33,13 @@ public class TableDataProxy implements TableData {
   @Override
   public ColumnDef[] columns() {
     return (ColumnDef[]) resource.invokeMethod(
-        "columns", null, null);
+        "columns");
   }
 
   @Override
   public Iterator<Row> rows() {
     String resourceName = resource.getResourceId().getName() + ".rows";
-    Resource rows = resource.invokeMethod("rows", null, null, resourceName);
+    Resource rows = resource.invokeMethod("rows", resourceName);
 
     ProxyRowIterator it = new ProxyRowIterator(rows);
     return it;
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/util/ByteBufferUtilTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/util/ByteBufferUtilTest.java
new file mode 100644
index 0000000..bfd40b2
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/util/ByteBufferUtilTest.java
@@ -0,0 +1,17 @@
+package org.apache.zeppelin.interpreter.util;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ByteBufferUtilTest {
+
+  @Test
+  public void fromByteBufferToByteBuffer() {
+    String str = "Hello world";
+    ByteBuffer byteBuffer = ByteBufferUtils.stringToByteBuffer(str, 
Charset.defaultCharset());
+    assertEquals(str, ByteBufferUtils.ByteBufferToString(byteBuffer, 
Charset.defaultCharset()));
+  }
+}
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
index fb8b271..211d85d 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.zeppelin.resource;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -32,4 +35,37 @@ public class ResourceTest {
     ByteBuffer buffer = Resource.serializeObject("hello");
     assertEquals("hello", Resource.deserializeObject(buffer));
   }
+
+  @Test
+  public void testInvokeMethod_shouldAbleToInvokeMethodWithNoParams() {
+    Resource r = new Resource(null, new ResourceId("pool1", "name1"), 
"object");
+    assertEquals(6, r.invokeMethod("length"));
+    assertEquals(6, r.invokeMethod("length", new Class[]{}, new Object[]{}));
+  }
+
+  @Test
+  public void testInvokeMethod_shouldAbleToInvokeMethodWithTypeInference() 
throws ClassNotFoundException, NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
+    Resource r = new Resource(null, new ResourceId("pool1", "name1"), 
"object");
+    assertEquals("ect", r.invokeMethod("substring", new Object[]{3}));
+    assertEquals(true, r.invokeMethod("startsWith", new Object[]{"obj"}));
+
+    assertEquals("ect", r.invokeMethod("substring", new 
ArrayList<>(Arrays.asList(3))));
+    assertEquals(true, r.invokeMethod("startsWith", new 
ArrayList<>(Arrays.asList("obj"))));
+  }
+
+  @Test
+  public void testInvokeMethod_shouldAbleToInvokeMethodWithParamClassName() 
throws ClassNotFoundException, NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
+    Resource r = new Resource(null, new ResourceId("pool1", "name1"), 
"object");
+    assertEquals("ect", r.invokeMethod("substring", new String[]{"int"}, new 
Object[]{3}));
+    assertEquals(true, r.invokeMethod("startsWith", new 
String[]{"java.lang.String"}, new Object[]{"obj"}));
+
+    assertEquals("ect", r.invokeMethod("substring", new 
ArrayList<>(Arrays.asList("int")), new ArrayList<>(Arrays.asList(3))));
+    assertEquals(true, r.invokeMethod("startsWith", new 
ArrayList<>(Arrays.asList("java.lang.String")), new 
ArrayList<>(Arrays.asList("obj"))));
+  }
+
+  @Test
+  public void testInvokeMethod_shouldAbleToInvokeMethodWithClass() throws 
ClassNotFoundException, NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+    Resource r = new Resource(null, new ResourceId("pool1", "name1"), 
"object");
+    assertEquals(true, r.invokeMethod("startsWith", new Class[]{ 
java.lang.String.class }, new Object[]{"obj"}));
+  }
 }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index 5932c12..7f00934 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -327,6 +327,13 @@ public class RemoteInterpreterEventServer implements 
RemoteInterpreterEventServi
     return obj;
   }
 
+  /**
+   *
+   * @param intpGroupId caller interpreter group id
+   * @param invokeMethodJson invoke information
+   * @return
+   * @throws TException
+   */
   @Override
   public ByteBuffer invokeMethod(String intpGroupId, String invokeMethodJson) 
throws TException {
     InvokeResourceMethodEventMessage invokeMethodMessage =
@@ -339,7 +346,7 @@ public class RemoteInterpreterEventServer implements 
RemoteInterpreterEventServi
       try {
         obj = Resource.serializeObject(ret);
       } catch (IOException e) {
-        e.printStackTrace();
+        LOGGER.error("invokeMethod failed", e);
       }
     }
     return obj;
@@ -394,10 +401,8 @@ public class RemoteInterpreterEventServer implements 
RemoteInterpreterEventServi
         LOGGER.error("no resource pool");
         return null;
       }
-    } else if (interpreterSettingManager.getInterpreterGroupById(intpGroupId)
-        .getInterpreterProcess().isRunning()) {
-      ByteBuffer res = 
interpreterSettingManager.getInterpreterGroupById(intpGroupId)
-          .getInterpreterProcess().callRemoteFunction(
+    } else if (remoteInterpreterProcess.isRunning()) {
+      ByteBuffer res = remoteInterpreterProcess.callRemoteFunction(
           new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
             @Override
             public ByteBuffer call(RemoteInterpreterService.Client client) 
throws Exception {
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
index d900031..c01bbd2 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -92,11 +92,11 @@ public class MockInterpreterResourcePool extends 
Interpreter {
       Resource resource = resourcePool.get(noteId, paragraphId, name);
       LOGGER.info("Resource: " + resource);
       if (stmt.length >=4) {
-        Resource res = resource.invokeMethod(value, null, null, stmt[3]);
+        Resource res = resource.invokeMethod(value, stmt[3]);
         LOGGER.info("After invokeMethod: " + resource);
         ret = res.get();
       } else {
-        ret = resource.invokeMethod(value, null, null);
+        ret = resource.invokeMethod(value);
         LOGGER.info("After invokeMethod: " + ret);
       }
     }
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
index 925515e..54c09b6 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
@@ -20,6 +20,7 @@ import com.google.gson.Gson;
 import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
@@ -27,6 +28,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.zeppelin.interpreter.InterpreterOption.ISOLATED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -44,8 +46,9 @@ public class DistributedResourcePoolTest extends 
AbstractInterpreterTest {
   public void setUp() throws Exception {
     super.setUp();
     InterpreterSetting interpreterSetting = 
interpreterSettingManager.getByName("mock_resource_pool");
+    interpreterSetting.getOption().setPerNote(ISOLATED);
     intp1 = (RemoteInterpreter) interpreterSetting.getInterpreter("user1", 
"note1", "mock_resource_pool");
-    intp2 = (RemoteInterpreter) interpreterSetting.getInterpreter("user2", 
"note1", "mock_resource_pool");
+    intp2 = (RemoteInterpreter) interpreterSetting.getInterpreter("user2", 
"note2", "mock_resource_pool");
 
     context = InterpreterContext.builder()
         .setNoteId("note")

Reply via email to