This is an automated email from the ASF dual-hosted git repository. moon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 154ee11 [ZEPPELIN-4004] Fix RemoteResource.invokeMethod() 154ee11 is described below commit 154ee11842a9ff874a132ab2e80f6c06dfffa190 Author: Lee moon soo <m...@apache.org> AuthorDate: Mon Feb 18 16:31:00 2019 -0800 [ZEPPELIN-4004] Fix RemoteResource.invokeMethod() ### What is this PR for? RemoteResource is a representation of object in ResourcePool in Interpreter running on another process. RemoteResource provides a invokeMethod() to call method of an object remotely, which is not working now. This PR provide a fix for that. Also improve the test case to cover this case. Also provide small improvement to inference parameter type on remote invocation. ### What type of PR is it? Bug Fix, Improvement ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-4004 ### How should this be tested? Unittest included. CI pass. ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Lee moon soo <m...@apache.org> Closes #3312 from Leemoonsoo/ZEPPELIN-4004 and squashes the following commits: c44dcc073 [Lee moon soo] address reference to invokeMethod is ambiguous 1893ad150 [Lee moon soo] inference parameter type fe6f5ed83 [Lee moon soo] Fix RemoteResource.invokeMethod() and update testcase to cover it --- .../remote/RemoteInterpreterEventClient.java | 98 +++---- .../remote/RemoteInterpreterServer.java | 38 +-- .../interpreter/util/ByteBufferUtils.java} | 28 +- .../apache/zeppelin/resource/RemoteResource.java | 5 +- .../org/apache/zeppelin/resource/Resource.java | 313 ++++++++++++++++++++- .../org/apache/zeppelin/resource/ResourceId.java | 3 +- .../zeppelin/tabledata/ProxyRowIterator.java | 4 +- .../apache/zeppelin/tabledata/TableDataProxy.java | 4 +- .../interpreter/util/ByteBufferUtilTest.java | 17 ++ .../org/apache/zeppelin/resource/ResourceTest.java | 36 +++ .../interpreter/RemoteInterpreterEventServer.java | 15 +- .../remote/mock/MockInterpreterResourcePool.java | 4 +- .../resource/DistributedResourcePoolTest.java | 5 +- 13 files changed, 446 insertions(+), 124 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index b7e77c6..5ac1c0a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -124,39 +124,20 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, Object[] params) { LOGGER.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName()); - return null; - // InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage( - // resourceId, - // methodName, - // paramTypes, - // params, - // null); - // - // synchronized (getInvokeResponse) { - // // wait for previous response consumed - // while (getInvokeResponse.containsKey(invokeMethod)) { - // try { - // getInvokeResponse.wait(); - // } catch (InterruptedException e) { - // LOGGER.warn(e.getMessage(), e); - // } - // } - // // send request - // sendEvent(new RemoteInterpreterEvent( - // RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD, - // invokeMethod.toJson())); - // // wait for response - // while (!getInvokeResponse.containsKey(invokeMethod)) { - // try { - // getInvokeResponse.wait(); - // } catch (InterruptedException e) { - // LOGGER.warn(e.getMessage(), e); - // } - // } - // Object o = getInvokeResponse.remove(invokeMethod); - // getInvokeResponse.notifyAll(); - // return o; - // } + InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage( + resourceId, + methodName, + paramTypes, + params, + null); + try { + ByteBuffer buffer = intpEventServiceClient.invokeMethod(intpGroupId, invokeMethod.toJson()); + Object o = Resource.deserializeObject(buffer); + return o; + } catch (TException | IOException | ClassNotFoundException e) { + LOGGER.error("Failed to invoke method", e); + return null; + } } /** @@ -178,39 +159,24 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, String returnResourceName) { LOGGER.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName()); - return null; - // InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage( - // resourceId, - // methodName, - // paramTypes, - // params, - // returnResourceName); - // - // synchronized (getInvokeResponse) { - // // wait for previous response consumed - // while (getInvokeResponse.containsKey(invokeMethod)) { - // try { - // getInvokeResponse.wait(); - // } catch (InterruptedException e) { - // LOGGER.warn(e.getMessage(), e); - // } - // } - // // send request - // sendEvent(new RemoteInterpreterEvent( - // RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD, - // invokeMethod.toJson())); - // // wait for response - // while (!getInvokeResponse.containsKey(invokeMethod)) { - // try { - // getInvokeResponse.wait(); - // } catch (InterruptedException e) { - // LOGGER.warn(e.getMessage(), e); - // } - // } - // Resource o = (Resource) getInvokeResponse.remove(invokeMethod); - // getInvokeResponse.notifyAll(); - // return o; - // } + InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage( + resourceId, + methodName, + paramTypes, + params, + returnResourceName); + + try { + ByteBuffer serializedResource = intpEventServiceClient.invokeMethod(intpGroupId, invokeMethod.toJson()); + Resource deserializedResource = (Resource) Resource.deserializeObject(serializedResource); + RemoteResource remoteResource = RemoteResource.fromJson(gson.toJson(deserializedResource)); + remoteResource.setResourcePoolConnector(this); + + return remoteResource; + } catch (TException | IOException | ClassNotFoundException e) { + LOGGER.error("Failed to invoke method", e); + return null; + } } public synchronized void onInterpreterOutputAppend( diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index c50b8a4..08fd2f7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -943,7 +943,6 @@ public class RemoteInterpreterServer extends Thread for (Resource r : resourceSet) { result.add(r.toJson()); } - return result; } @@ -977,7 +976,6 @@ public class RemoteInterpreterServer extends Thread String noteId, String paragraphId, String resourceName, String invokeMessage) { InvokeResourceMethodEventMessage message = InvokeResourceMethodEventMessage.fromJson(invokeMessage); - Resource resource = resourcePool.get(noteId, paragraphId, resourceName, false); if (resource == null || resource.get() == null) { return ByteBuffer.allocate(0); @@ -991,13 +989,20 @@ public class RemoteInterpreterServer extends Thread if (message.shouldPutResultIntoResourcePool()) { // if return resource name is specified, // then put result into resource pool - // and return empty byte buffer + // and return the Resource class instead of actual return object. resourcePool.put( noteId, paragraphId, message.returnResourceName, ret); - return ByteBuffer.allocate(0); + + Resource returnValResource = resourcePool.get(noteId, paragraphId, message.returnResourceName); + ByteBuffer serialized = Resource.serializeObject(returnValResource); + if (serialized == null) { + return ByteBuffer.allocate(0); + } else { + return serialized; + } } else { // if return resource name is not specified, // then return serialized result @@ -1015,31 +1020,6 @@ public class RemoteInterpreterServer extends Thread } } - // /** - // * Get payload of resource from remote - // * - // * @param invokeResourceMethodEventMessage json serialized InvokeResourcemethodEventMessage - // * @param object java serialized of the object - // * @throws TException - // */ - // @Override - // public void resourceResponseInvokeMethod( - // String invokeResourceMethodEventMessage, ByteBuffer object) throws TException { - // InvokeResourceMethodEventMessage message = - // InvokeResourceMethodEventMessage.fromJson(invokeResourceMethodEventMessage); - // - // if (message.shouldPutResultIntoResourcePool()) { - // Resource resource = resourcePool.get( - // message.resourceId.getNoteId(), - // message.resourceId.getParagraphId(), - // message.returnResourceName, - // true); - // eventClient.putResponseInvokeMethod(message, resource); - // } else { - // eventClient.putResponseInvokeMethod(message, object); - // } - // } - @Override public void angularRegistryPush(String registryAsString) throws TException { try { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ByteBufferUtils.java similarity index 61% copy from zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java copy to zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ByteBufferUtils.java index fb8b271..aff7588 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ByteBufferUtils.java @@ -14,22 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.zeppelin.resource; +package org.apache.zeppelin.interpreter.util; -import org.junit.Test; - -import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.Charset; -import static org.junit.Assert.assertEquals; +public class ByteBufferUtils { + public static ByteBuffer stringToByteBuffer(String msg, Charset charset){ + return ByteBuffer.wrap(msg.getBytes(charset)); + } -/** - * Test for Resource - */ -public class ResourceTest { - @Test - public void testSerializeDeserialize() throws IOException, ClassNotFoundException { - ByteBuffer buffer = Resource.serializeObject("hello"); - assertEquals("hello", Resource.deserializeObject(buffer)); + public static String ByteBufferToString(ByteBuffer buffer, Charset charset){ + byte[] bytes; + if(buffer.hasArray()) { + bytes = buffer.array(); + } else { + bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + } + return new String(bytes, charset); } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java index 874c1cb..19d84a0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java @@ -17,15 +17,16 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; +import java.io.Serializable; import org.apache.zeppelin.common.JsonSerializable; /** * Resource that can retrieve data from remote */ -public class RemoteResource extends Resource implements JsonSerializable { +public class RemoteResource extends Resource implements JsonSerializable, Serializable { private static final Gson gson = new Gson(); - ResourcePoolConnector resourcePoolConnector; + transient ResourcePoolConnector resourcePoolConnector; RemoteResource(ResourceId resourceId, Object r) { super(null, resourceId, r); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java index ec95ffb..c671707 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java @@ -17,6 +17,10 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; import org.apache.zeppelin.common.JsonSerializable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +37,7 @@ import java.nio.ByteBuffer; /** * Information and reference to the resource */ -public class Resource implements JsonSerializable { +public class Resource implements JsonSerializable, Serializable { private static final Gson gson = new Gson(); private final transient Object r; @@ -111,6 +115,206 @@ public class Resource implements JsonSerializable { return true; } + /** + * Invoke a method without param + * @param methodName + * @return + */ + public Object invokeMethod(String methodName) { + return invokeMethod(methodName, (Class []) null, (Object []) null); + } + + /** + * Invoke a method and store result in ResourcePool + * @param methodName + * @param returnResourceName + * @return + */ + public Resource invokeMethod(String methodName, String returnResourceName) { + return invokeMethod(methodName, (Class []) null, (Object []) null, returnResourceName); + } + + /** + * Invoke a method with automatic parameter type inference + * @param methodName + * @param params + * @return + * @throws ClassNotFoundException + */ + public Object invokeMethod(String methodName, Object [] params) + throws ClassNotFoundException { + return invokeMethod(methodName, (Type[]) null, params); + } + + /** + * Invoke a method with automatic parameter type inference + * @param methodName + * @param params python interpreter convert python array '[]' to ArrayList through py4j + * @return + * @throws ClassNotFoundException + */ + public Object invokeMethod( + String methodName, ArrayList params) + throws ClassNotFoundException { + Object[] paramsArray = params.toArray(new Object[]{}); + return invokeMethod(methodName, paramsArray); + } + + /** + * Invoke a method with automatic parameter type inference and store result in ResourcePool + * @param methodName + * @param params + * @param returnResourceName + * @return + * @throws ClassNotFoundException + */ + public Resource invokeMethod(String methodName, Object [] params, String returnResourceName) + throws ClassNotFoundException { + return (Resource) invokeMethod(methodName, (Type[]) null, params, returnResourceName); + } + + /** + * Invoke a method with automatic parameter type inference and store result in ResourcePool + * @param methodName + * @param params python interpreter convert python array '[]' to ArrayList through py4j + * @param returnResourceName + * @return + * @throws ClassNotFoundException + */ + public Resource invokeMethod( + String methodName, ArrayList params, String returnResourceName) + throws ClassNotFoundException { + Object[] paramsArray = params.toArray(new Object[]{}); + return invokeMethod(methodName, paramsArray, returnResourceName); + } + + /** + * Invoke a method with given parameter class names + * @param methodName + * @param paramTypes list of fully qualified class name + * @param params + * @return + * @throws ClassNotFoundException + */ + public Object invokeMethod( + String methodName, String[] paramTypes, Object[] params) + throws ClassNotFoundException { + Type [] types = typeFromName(paramTypes); + return invokeMethod(methodName, types, params); + } + + /** + * Invoke a method with given parameter class names + * @param methodName + * @param paramTypes list of fully qualified class name. python interpreter convert python array '[]' to ArrayList through py4j + * @param params python interpreter convert python array '[]' to ArrayList through py4j + * @return + * @throws ClassNotFoundException + */ + public Object invokeMethod( + String methodName, ArrayList<String> paramTypes, ArrayList params) + throws ClassNotFoundException { + String[] paramTypesArray = paramTypes.toArray(new String[]{}); + Object[] paramsArray = params.toArray(new Object[]{}); + return invokeMethod(methodName, paramTypesArray, paramsArray); + } + + /** + * Invoke a method with given parameter class names and store result in ResourcePool + * @param methodName + * @param paramTypes + * @param params + * @param returnResourceName + * @return + * @throws ClassNotFoundException + */ + public Resource invokeMethod( + String methodName, String[] paramTypes, Object[] params, String returnResourceName) + throws ClassNotFoundException { + Type [] types = typeFromName(paramTypes); + return (Resource) invokeMethod(methodName, types, params, returnResourceName); + } + + + public Resource invokeMethod( + String methodName, ArrayList<String> paramTypes, ArrayList params, String returnResourceName) + throws ClassNotFoundException { + String[] paramTypesArray = paramTypes.toArray(new String[]{}); + Object[] paramsArray = params.toArray(new Object[]{}); + return invokeMethod(methodName, paramTypesArray, paramsArray, returnResourceName); + } + + /** + * Invoke a method with give parameter types + * @param methodName + * @param types + * @param params + * @return + * @throws ClassNotFoundException + */ + public Object invokeMethod( + String methodName, Type[] types, Object[] params) + throws ClassNotFoundException { + return invokeMethod(methodName, types, params, null); + } + + /** + * Invoke a method with given parameter type and store result in ResourcePool + * @param methodName + * @param types + * @param params + * @param returnResourceName + * @return + * @throws ClassNotFoundException + */ + public Object invokeMethod( + String methodName, Type[] types, Object[] params, String returnResourceName) throws ClassNotFoundException { + Type[] methodTypes = null; + Object [] methodParams = null; + if (types != null) { + methodTypes = types; + methodParams = params; + } else { + // inference method param types + boolean found = false; + Method[] methods = r.getClass().getDeclaredMethods(); + for (Method m : methods) { + if (!m.getName().equals(methodName)) { + continue; + } + Type[] paramTypes = m.getGenericParameterTypes(); + Object[] paramValues = new Object[paramTypes.length]; + + int pidx = 0; + for (int i = 0; i < paramTypes.length; i++) { + if (pidx == params.length) { // not enough param for this method signature + continue; + } else { + paramValues[i] = params[pidx++]; + } + } + + if (pidx == params.length) { // param number does not match + found = true; + methodParams = paramValues; + methodTypes = paramTypes; + break; + } + } + + if (!found) { + throw new ClassNotFoundException("No method found for given parameters"); + } + } + + Class[] classes = classFromType(methodTypes); + + if (returnResourceName == null) { + return invokeMethod(methodName, classes, convertParams(methodTypes, methodParams)); + } else { + return invokeMethod(methodName, classes, convertParams(methodTypes, methodParams), returnResourceName); + } + } /** * Call a method of the object that this resource holds @@ -222,4 +426,111 @@ public class Resource implements JsonSerializable { public static Resource fromJson(String json) { return gson.fromJson(json, Resource.class); } + + private ParameterizedType [] typeFromName(String [] classNames) throws ClassNotFoundException { + if (classNames == null) { + return null; + } + ParameterizedType[] types = new ParameterizedType[classNames.length]; + for (int i = 0; i < classNames.length; i++) { + types[i] = typeFromName(classNames[i]); + } + return types; + } + + private ParameterizedType typeFromName(String commaSeparatedClasses) throws ClassNotFoundException { + String[] classNames = commaSeparatedClasses.split(","); + Class [] arguments; + + if (classNames.length > 1) { + arguments = new Class[classNames.length - 1]; + for (int i = 1; i < classNames.length; i++) { + arguments[i - 1] = loadClass(classNames[i]); + } + } else { + arguments = new Class[0]; + } + + Class rawType = loadClass(classNames[0]); + + return new ParameterizedType() { + @Override + public Type[] getActualTypeArguments() { + return arguments; + } + + @Override + public Type getRawType() { + return rawType; + } + + @Override + public Type getOwnerType() { + return null; + } + }; + } + + private Class [] classFromType(Type[] types) throws ClassNotFoundException { + Class[] cls = new Class[types.length]; + for (int i = 0; i < types.length; i++) { + if (types[i] instanceof ParameterizedType) { + String typeName = ((ParameterizedType) types[i]).getRawType().getTypeName(); + cls[i] = loadClass(typeName); + } else { + cls[i] = loadClass(types[i].getTypeName()); + } + } + return cls; + } + + + private Object [] convertParams(Type[] types, Object [] params) { + Object [] converted = new Object[types.length]; + + for (int i = 0; i < types.length; i++) { + Type type = types[i]; + String typeName; + if (type instanceof ParameterizedType) { + typeName = ((ParameterizedType) type).getRawType().getTypeName(); + } else { + typeName = type.getTypeName(); + } + + Object param = params[i]; + if (param == null) { + converted[i] = null; + } else if (param.getClass().getName().equals(typeName)) { + converted[i] = param; + } else { + // try to convert param + converted[i] = gson.fromJson(gson.toJson(param), type); + } + } + + return converted; + } + + private Class loadClass(String className) throws ClassNotFoundException { + switch(className) { + case "byte": + return byte.class; + case "short": + return short.class; + case "int": + return int.class; + case "long": + return long.class; + case "float": + return float.class; + case "double": + return double.class; + case "boolean": + return boolean.class; + case "char": + return char.class; + default: + return getClass().getClassLoader().loadClass(className); + } + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java index bef9e3f..ce06b73 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java @@ -17,12 +17,13 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; +import java.io.Serializable; import org.apache.zeppelin.common.JsonSerializable; /** * Identifying resource */ -public class ResourceId implements JsonSerializable { +public class ResourceId implements JsonSerializable, Serializable { private static final Gson gson = new Gson(); private final String resourcePoolId; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/ProxyRowIterator.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/ProxyRowIterator.java index 8a59098..ceb122c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/ProxyRowIterator.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/ProxyRowIterator.java @@ -33,13 +33,13 @@ public class ProxyRowIterator implements Iterator<Row> { @Override public boolean hasNext() { - rows.invokeMethod("hasNext", null, null); + rows.invokeMethod("hasNext"); return false; } @Override public Row next() { - return (Row) rows.invokeMethod("next", null, null); + return (Row) rows.invokeMethod("next"); } @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java index 1926528..bb1f842 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java @@ -33,13 +33,13 @@ public class TableDataProxy implements TableData { @Override public ColumnDef[] columns() { return (ColumnDef[]) resource.invokeMethod( - "columns", null, null); + "columns"); } @Override public Iterator<Row> rows() { String resourceName = resource.getResourceId().getName() + ".rows"; - Resource rows = resource.invokeMethod("rows", null, null, resourceName); + Resource rows = resource.invokeMethod("rows", resourceName); ProxyRowIterator it = new ProxyRowIterator(rows); return it; diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/util/ByteBufferUtilTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/util/ByteBufferUtilTest.java new file mode 100644 index 0000000..bfd40b2 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/util/ByteBufferUtilTest.java @@ -0,0 +1,17 @@ +package org.apache.zeppelin.interpreter.util; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ByteBufferUtilTest { + + @Test + public void fromByteBufferToByteBuffer() { + String str = "Hello world"; + ByteBuffer byteBuffer = ByteBufferUtils.stringToByteBuffer(str, Charset.defaultCharset()); + assertEquals(str, ByteBufferUtils.ByteBufferToString(byteBuffer, Charset.defaultCharset())); + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java index fb8b271..211d85d 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java @@ -16,6 +16,9 @@ */ package org.apache.zeppelin.resource; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Arrays; import org.junit.Test; import java.io.IOException; @@ -32,4 +35,37 @@ public class ResourceTest { ByteBuffer buffer = Resource.serializeObject("hello"); assertEquals("hello", Resource.deserializeObject(buffer)); } + + @Test + public void testInvokeMethod_shouldAbleToInvokeMethodWithNoParams() { + Resource r = new Resource(null, new ResourceId("pool1", "name1"), "object"); + assertEquals(6, r.invokeMethod("length")); + assertEquals(6, r.invokeMethod("length", new Class[]{}, new Object[]{})); + } + + @Test + public void testInvokeMethod_shouldAbleToInvokeMethodWithTypeInference() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Resource r = new Resource(null, new ResourceId("pool1", "name1"), "object"); + assertEquals("ect", r.invokeMethod("substring", new Object[]{3})); + assertEquals(true, r.invokeMethod("startsWith", new Object[]{"obj"})); + + assertEquals("ect", r.invokeMethod("substring", new ArrayList<>(Arrays.asList(3)))); + assertEquals(true, r.invokeMethod("startsWith", new ArrayList<>(Arrays.asList("obj")))); + } + + @Test + public void testInvokeMethod_shouldAbleToInvokeMethodWithParamClassName() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Resource r = new Resource(null, new ResourceId("pool1", "name1"), "object"); + assertEquals("ect", r.invokeMethod("substring", new String[]{"int"}, new Object[]{3})); + assertEquals(true, r.invokeMethod("startsWith", new String[]{"java.lang.String"}, new Object[]{"obj"})); + + assertEquals("ect", r.invokeMethod("substring", new ArrayList<>(Arrays.asList("int")), new ArrayList<>(Arrays.asList(3)))); + assertEquals(true, r.invokeMethod("startsWith", new ArrayList<>(Arrays.asList("java.lang.String")), new ArrayList<>(Arrays.asList("obj")))); + } + + @Test + public void testInvokeMethod_shouldAbleToInvokeMethodWithClass() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Resource r = new Resource(null, new ResourceId("pool1", "name1"), "object"); + assertEquals(true, r.invokeMethod("startsWith", new Class[]{ java.lang.String.class }, new Object[]{"obj"})); + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index 5932c12..7f00934 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -327,6 +327,13 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi return obj; } + /** + * + * @param intpGroupId caller interpreter group id + * @param invokeMethodJson invoke information + * @return + * @throws TException + */ @Override public ByteBuffer invokeMethod(String intpGroupId, String invokeMethodJson) throws TException { InvokeResourceMethodEventMessage invokeMethodMessage = @@ -339,7 +346,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi try { obj = Resource.serializeObject(ret); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("invokeMethod failed", e); } } return obj; @@ -394,10 +401,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi LOGGER.error("no resource pool"); return null; } - } else if (interpreterSettingManager.getInterpreterGroupById(intpGroupId) - .getInterpreterProcess().isRunning()) { - ByteBuffer res = interpreterSettingManager.getInterpreterGroupById(intpGroupId) - .getInterpreterProcess().callRemoteFunction( + } else if (remoteInterpreterProcess.isRunning()) { + ByteBuffer res = remoteInterpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { @Override public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception { diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index d900031..c01bbd2 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -92,11 +92,11 @@ public class MockInterpreterResourcePool extends Interpreter { Resource resource = resourcePool.get(noteId, paragraphId, name); LOGGER.info("Resource: " + resource); if (stmt.length >=4) { - Resource res = resource.invokeMethod(value, null, null, stmt[3]); + Resource res = resource.invokeMethod(value, stmt[3]); LOGGER.info("After invokeMethod: " + resource); ret = res.get(); } else { - ret = resource.invokeMethod(value, null, null); + ret = resource.invokeMethod(value); LOGGER.info("After invokeMethod: " + ret); } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java index 925515e..54c09b6 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java @@ -20,6 +20,7 @@ import com.google.gson.Gson; import org.apache.zeppelin.interpreter.AbstractInterpreterTest; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; @@ -27,6 +28,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.apache.zeppelin.interpreter.InterpreterOption.ISOLATED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -44,8 +46,9 @@ public class DistributedResourcePoolTest extends AbstractInterpreterTest { public void setUp() throws Exception { super.setUp(); InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("mock_resource_pool"); + interpreterSetting.getOption().setPerNote(ISOLATED); intp1 = (RemoteInterpreter) interpreterSetting.getInterpreter("user1", "note1", "mock_resource_pool"); - intp2 = (RemoteInterpreter) interpreterSetting.getInterpreter("user2", "note1", "mock_resource_pool"); + intp2 = (RemoteInterpreter) interpreterSetting.getInterpreter("user2", "note2", "mock_resource_pool"); context = InterpreterContext.builder() .setNoteId("note")