[
https://issues.apache.org/jira/browse/HADOOP-15530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yongjun Zhang updated HADOOP-15530:
-----------------------------------
Description:
In Client.java, sendRpcRequest does the following
{code}
/** Initiates a rpc call by sending the rpc request to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
* @param call - the rpc request
*/
public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
if (shouldCloseConnection.get()) {
return;
}
// Serialize the call to be sent. This is done from the actual
// caller thread, rather than the sendParamsExecutor thread,
// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
// 1) RpcRequestHeader - is serialized Delimited hence contains length
// 2) RpcRequest
//
// Items '1' and '2' are prepared here.
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf);
RpcWritable.wrap(call.rpcRequest).writeTo(buf);
synchronized (sendRpcRequestLock) {
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (ipcStreams.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + " sending #" + call.id
+ " " + call.rpcRequest);
}
// RpcRequestHeader + RpcRequest
ipcStreams.sendRequest(buf.toByteArray());
ipcStreams.flush();
}
} catch (IOException e) {
// exception at this point would leave the connection in an
// unrecoverable state (eg half a call left on the wire).
// So, close the connection, killing any outstanding calls
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite
to
// close early
IOUtils.closeStream(buf);
}
}
});
try {
senderFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// cause should only be a RuntimeException as the Runnable above
// catches IOException
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("unexpected checked exception", cause);
}
}
}
}
{code}
It's observed that the call can be stuck at {{senderFuture.get();}} with the
following stack
{code}
"Thread-13" #40 prio=5 os_prio=0 tid=0x000000000fb0d000 nid=0xf189c waiting on
condition [0x00007f697c582000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006187e5ec0> (a
java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at
org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1088)
- locked <0x00000006215c1e08> (a java.lang.Object)
at org.apache.hadoop.ipc.Client.call(Client.java:1483)
at org.apache.hadoop.ipc.Client.call(Client.java:1441)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:266)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1323)
at
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1310)
at
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1298)
at
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
at
org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:275)
- locked <0x00000006187e5530> (a java.lang.Object)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:267)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1629)
at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:338)
at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:334)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:334)
{code}
Given that we support rpcTimeOut, we could chose the second method of Future
below:
{code}
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
{code}
In theory, since the RPC at client is serialized, we could just use the main
thread to do the execution, instead of using a threadpool to create new thread.
This can be discussed in a separate jira.
And why the RPC is not processed and returned by NN is another topic
(HADOOP-15538).
was:
In Client.java, sendRpcRequest does the following
{code}
/** Initiates a rpc call by sending the rpc request to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
* @param call - the rpc request
*/
public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
if (shouldCloseConnection.get()) {
return;
}
// Serialize the call to be sent. This is done from the actual
// caller thread, rather than the sendParamsExecutor thread,
// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
// 1) RpcRequestHeader - is serialized Delimited hence contains length
// 2) RpcRequest
//
// Items '1' and '2' are prepared here.
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf);
RpcWritable.wrap(call.rpcRequest).writeTo(buf);
synchronized (sendRpcRequestLock) {
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (ipcStreams.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + " sending #" + call.id
+ " " + call.rpcRequest);
}
// RpcRequestHeader + RpcRequest
ipcStreams.sendRequest(buf.toByteArray());
ipcStreams.flush();
}
} catch (IOException e) {
// exception at this point would leave the connection in an
// unrecoverable state (eg half a call left on the wire).
// So, close the connection, killing any outstanding calls
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite
to
// close early
IOUtils.closeStream(buf);
}
}
});
try {
senderFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// cause should only be a RuntimeException as the Runnable above
// catches IOException
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("unexpected checked exception", cause);
}
}
}
}
{code}
It's observed that the call can be stuck at {{senderFuture.get();}}
Given that we support rpcTimeOut, we could chose the second method of Future
below:
{code}
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
{code}
In theory, since the RPC at client is serialized, we could just use the main
thread to do the execution, instead of using a threadpool to create new thread.
This can be discussed in a separate jira.
And why the RPC is not processed and returned by NN is another topic
(HADOOP-15538).
> RPC could stuck at senderFuture.get()
> -------------------------------------
>
> Key: HADOOP-15530
> URL: https://issues.apache.org/jira/browse/HADOOP-15530
> Project: Hadoop Common
> Issue Type: Bug
> Components: common
> Reporter: Yongjun Zhang
> Assignee: Yongjun Zhang
> Priority: Major
>
> In Client.java, sendRpcRequest does the following
> {code}
> /** Initiates a rpc call by sending the rpc request to the remote server.
> * Note: this is not called from the Connection thread, but by other
> * threads.
> * @param call - the rpc request
> */
> public void sendRpcRequest(final Call call)
> throws InterruptedException, IOException {
> if (shouldCloseConnection.get()) {
> return;
> }
> // Serialize the call to be sent. This is done from the actual
> // caller thread, rather than the sendParamsExecutor thread,
> // so that if the serialization throws an error, it is reported
> // properly. This also parallelizes the serialization.
> //
> // Format of a call on the wire:
> // 0) Length of rest below (1 + 2)
> // 1) RpcRequestHeader - is serialized Delimited hence contains length
> // 2) RpcRequest
> //
> // Items '1' and '2' are prepared here.
> RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
> call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
> clientId);
> final ResponseBuffer buf = new ResponseBuffer();
> header.writeDelimitedTo(buf);
> RpcWritable.wrap(call.rpcRequest).writeTo(buf);
> synchronized (sendRpcRequestLock) {
> Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> synchronized (ipcStreams.out) {
> if (shouldCloseConnection.get()) {
> return;
> }
> if (LOG.isDebugEnabled()) {
> LOG.debug(getName() + " sending #" + call.id
> + " " + call.rpcRequest);
> }
> // RpcRequestHeader + RpcRequest
> ipcStreams.sendRequest(buf.toByteArray());
> ipcStreams.flush();
> }
> } catch (IOException e) {
> // exception at this point would leave the connection in an
> // unrecoverable state (eg half a call left on the wire).
> // So, close the connection, killing any outstanding calls
> markClosed(e);
> } finally {
> //the buffer is just an in-memory buffer, but it is still
> polite to
> // close early
> IOUtils.closeStream(buf);
> }
> }
> });
> try {
> senderFuture.get();
> } catch (ExecutionException e) {
> Throwable cause = e.getCause();
> // cause should only be a RuntimeException as the Runnable above
> // catches IOException
> if (cause instanceof RuntimeException) {
> throw (RuntimeException) cause;
> } else {
> throw new RuntimeException("unexpected checked exception", cause);
> }
> }
> }
> }
> {code}
> It's observed that the call can be stuck at {{senderFuture.get();}} with the
> following stack
> {code}
> "Thread-13" #40 prio=5 os_prio=0 tid=0x000000000fb0d000 nid=0xf189c waiting
> on condition [0x00007f697c582000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000006187e5ec0> (a
> java.util.concurrent.FutureTask)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
> at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> at
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1088)
> - locked <0x00000006215c1e08> (a java.lang.Object)
> at org.apache.hadoop.ipc.Client.call(Client.java:1483)
> at org.apache.hadoop.ipc.Client.call(Client.java:1441)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
> at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:266)
> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
> at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1323)
> at
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1310)
> at
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1298)
> at
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
> at
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:275)
> - locked <0x00000006187e5530> (a java.lang.Object)
> at
> org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:267)
> at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1629)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:338)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:334)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:334)
> {code}
> Given that we support rpcTimeOut, we could chose the second method of Future
> below:
> {code}
> /**
> * Waits if necessary for the computation to complete, and then
> * retrieves its result.
> *
> * @return the computed result
> * @throws CancellationException if the computation was cancelled
> * @throws ExecutionException if the computation threw an
> * exception
> * @throws InterruptedException if the current thread was interrupted
> * while waiting
> */
> V get() throws InterruptedException, ExecutionException;
> /**
> * Waits if necessary for at most the given time for the computation
> * to complete, and then retrieves its result, if available.
> *
> * @param timeout the maximum time to wait
> * @param unit the time unit of the timeout argument
> * @return the computed result
> * @throws CancellationException if the computation was cancelled
> * @throws ExecutionException if the computation threw an
> * exception
> * @throws InterruptedException if the current thread was interrupted
> * while waiting
> * @throws TimeoutException if the wait timed out
> */
> V get(long timeout, TimeUnit unit)
> throws InterruptedException, ExecutionException, TimeoutException;
> {code}
> In theory, since the RPC at client is serialized, we could just use the main
> thread to do the execution, instead of using a threadpool to create new
> thread. This can be discussed in a separate jira.
> And why the RPC is not processed and returned by NN is another topic
> (HADOOP-15538).
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]