Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ff2e270eb -> 73014a2aa


[SPARK-17231][CORE] Avoid building debug or trace log messages unless the 
respective log level is enabled

This is simply a backport of #14798 to `branch-2.0`. This backport omits the 
change to `ExternalShuffleBlockHandler.java`. In `branch-2.0`, that file does 
not contain the log message that was patched in `master`.

Author: Michael Allman <[email protected]>

Closes #14811 from mallman/spark-17231-logging_perf_improvements-2.0_backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73014a2a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73014a2a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73014a2a

Branch: refs/heads/branch-2.0
Commit: 73014a2aa96b538d963f360fd41bac74f358ef46
Parents: ff2e270
Author: Michael Allman <[email protected]>
Authored: Thu Aug 25 16:29:04 2016 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Thu Aug 25 16:29:04 2016 -0700

----------------------------------------------------------------------
 .../spark/network/client/TransportClient.java   | 39 ++++++++++++--------
 .../network/client/TransportClientFactory.java  |  2 +-
 .../client/TransportResponseHandler.java        | 15 ++++----
 .../spark/network/protocol/MessageDecoder.java  |  2 +-
 .../network/server/TransportChannelHandler.java |  6 +--
 .../network/server/TransportRequestHandler.java | 18 ++++-----
 .../spark/network/server/TransportServer.java   |  2 +-
 .../shuffle/ExternalShuffleBlockResolver.java   |  2 +-
 8 files changed, 47 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 64a8317..a67683b 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -43,7 +43,7 @@ import org.apache.spark.network.protocol.OneWayMessage;
 import org.apache.spark.network.protocol.RpcRequest;
 import org.apache.spark.network.protocol.StreamChunkId;
 import org.apache.spark.network.protocol.StreamRequest;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
 
 /**
  * Client for fetching consecutive chunks of a pre-negotiated stream. This API 
is intended to allow
@@ -135,9 +135,10 @@ public class TransportClient implements Closeable {
       long streamId,
       final int chunkIndex,
       final ChunkReceivedCallback callback) {
-    final String serverAddr = NettyUtils.getRemoteAddress(channel);
     final long startTime = System.currentTimeMillis();
-    logger.debug("Sending fetch chunk request {} to {}", chunkIndex, 
serverAddr);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Sending fetch chunk request {} to {}", chunkIndex, 
getRemoteAddress(channel));
+    }
 
     final StreamChunkId streamChunkId = new StreamChunkId(streamId, 
chunkIndex);
     handler.addFetchRequest(streamChunkId, callback);
@@ -148,11 +149,13 @@ public class TransportClient implements Closeable {
         public void operationComplete(ChannelFuture future) throws Exception {
           if (future.isSuccess()) {
             long timeTaken = System.currentTimeMillis() - startTime;
-            logger.trace("Sending request {} to {} took {} ms", streamChunkId, 
serverAddr,
-              timeTaken);
+            if (logger.isTraceEnabled()) {
+              logger.trace("Sending request {} to {} took {} ms", 
streamChunkId, getRemoteAddress(channel),
+                timeTaken);
+            }
           } else {
             String errorMsg = String.format("Failed to send request %s to %s: 
%s", streamChunkId,
-              serverAddr, future.cause());
+              getRemoteAddress(channel), future.cause());
             logger.error(errorMsg, future.cause());
             handler.removeFetchRequest(streamChunkId);
             channel.close();
@@ -173,9 +176,10 @@ public class TransportClient implements Closeable {
    * @param callback Object to call with the stream data.
    */
   public void stream(final String streamId, final StreamCallback callback) {
-    final String serverAddr = NettyUtils.getRemoteAddress(channel);
     final long startTime = System.currentTimeMillis();
-    logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Sending stream request for {} to {}", streamId, 
getRemoteAddress(channel));
+    }
 
     // Need to synchronize here so that the callback is added to the queue and 
the RPC is
     // written to the socket atomically, so that callbacks are called in the 
right order
@@ -188,11 +192,13 @@ public class TransportClient implements Closeable {
           public void operationComplete(ChannelFuture future) throws Exception 
{
             if (future.isSuccess()) {
               long timeTaken = System.currentTimeMillis() - startTime;
-              logger.trace("Sending request for {} to {} took {} ms", 
streamId, serverAddr,
-                timeTaken);
+              if (logger.isTraceEnabled()) {
+                logger.trace("Sending request for {} to {} took {} ms", 
streamId, getRemoteAddress(channel),
+                  timeTaken);
+              }
             } else {
               String errorMsg = String.format("Failed to send request for %s 
to %s: %s", streamId,
-                serverAddr, future.cause());
+                getRemoteAddress(channel), future.cause());
               logger.error(errorMsg, future.cause());
               channel.close();
               try {
@@ -215,9 +221,10 @@ public class TransportClient implements Closeable {
    * @return The RPC's id.
    */
   public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
-    final String serverAddr = NettyUtils.getRemoteAddress(channel);
     final long startTime = System.currentTimeMillis();
-    logger.trace("Sending RPC to {}", serverAddr);
+    if (logger.isTraceEnabled()) {
+      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+    }
 
     final long requestId = 
Math.abs(UUID.randomUUID().getLeastSignificantBits());
     handler.addRpcRequest(requestId, callback);
@@ -228,10 +235,12 @@ public class TransportClient implements Closeable {
         public void operationComplete(ChannelFuture future) throws Exception {
           if (future.isSuccess()) {
             long timeTaken = System.currentTimeMillis() - startTime;
-            logger.trace("Sending request {} to {} took {} ms", requestId, 
serverAddr, timeTaken);
+            if (logger.isTraceEnabled()) {
+              logger.trace("Sending request {} to {} took {} ms", requestId, 
getRemoteAddress(channel), timeTaken);
+            }
           } else {
             String errorMsg = String.format("Failed to send RPC %s to %s: %s", 
requestId,
-              serverAddr, future.cause());
+              getRemoteAddress(channel), future.cause());
             logger.error(errorMsg, future.cause());
             handler.removeRpcRequest(requestId);
             channel.close();

http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index a27aaf2..1c9916b 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -195,7 +195,7 @@ public class TransportClientFactory implements Closeable {
 
   /** Create a completely new {@link TransportClient} to the remote address. */
   private TransportClient createClient(InetSocketAddress address) throws 
IOException {
-    logger.debug("Creating new connection to " + address);
+    logger.debug("Creating new connection to {}", address);
 
     Bootstrap bootstrap = new Bootstrap();
     bootstrap.group(workerGroup)

http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index 8a69223..1796672 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -38,7 +38,7 @@ import org.apache.spark.network.protocol.StreamChunkId;
 import org.apache.spark.network.protocol.StreamFailure;
 import org.apache.spark.network.protocol.StreamResponse;
 import org.apache.spark.network.server.MessageHandler;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
 import org.apache.spark.network.util.TransportFrameDecoder;
 
 /**
@@ -122,7 +122,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   @Override
   public void channelInactive() {
     if (numOutstandingRequests() > 0) {
-      String remoteAddress = NettyUtils.getRemoteAddress(channel);
+      String remoteAddress = getRemoteAddress(channel);
       logger.error("Still have {} requests outstanding when connection from {} 
is closed",
         numOutstandingRequests(), remoteAddress);
       failOutstandingRequests(new IOException("Connection from " + 
remoteAddress + " closed"));
@@ -132,7 +132,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   @Override
   public void exceptionCaught(Throwable cause) {
     if (numOutstandingRequests() > 0) {
-      String remoteAddress = NettyUtils.getRemoteAddress(channel);
+      String remoteAddress = getRemoteAddress(channel);
       logger.error("Still have {} requests outstanding when connection from {} 
is closed",
         numOutstandingRequests(), remoteAddress);
       failOutstandingRequests(cause);
@@ -141,13 +141,12 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
 
   @Override
   public void handle(ResponseMessage message) throws Exception {
-    String remoteAddress = NettyUtils.getRemoteAddress(channel);
     if (message instanceof ChunkFetchSuccess) {
       ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
       ChunkReceivedCallback listener = 
outstandingFetches.get(resp.streamChunkId);
       if (listener == null) {
         logger.warn("Ignoring response for block {} from {} since it is not 
outstanding",
-          resp.streamChunkId, remoteAddress);
+          resp.streamChunkId, getRemoteAddress(channel));
         resp.body().release();
       } else {
         outstandingFetches.remove(resp.streamChunkId);
@@ -159,7 +158,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
       ChunkReceivedCallback listener = 
outstandingFetches.get(resp.streamChunkId);
       if (listener == null) {
         logger.warn("Ignoring response for block {} from {} ({}) since it is 
not outstanding",
-          resp.streamChunkId, remoteAddress, resp.errorString);
+          resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
       } else {
         outstandingFetches.remove(resp.streamChunkId);
         listener.onFailure(resp.streamChunkId.chunkIndex, new 
ChunkFetchFailureException(
@@ -170,7 +169,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
       RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
       if (listener == null) {
         logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it 
is not outstanding",
-          resp.requestId, remoteAddress, resp.body().size());
+          resp.requestId, getRemoteAddress(channel), resp.body().size());
       } else {
         outstandingRpcs.remove(resp.requestId);
         try {
@@ -184,7 +183,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
       RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
       if (listener == null) {
         logger.warn("Ignoring response for RPC {} from {} ({}) since it is not 
outstanding",
-          resp.requestId, remoteAddress, resp.errorString);
+          resp.requestId, getRemoteAddress(channel), resp.errorString);
       } else {
         outstandingRpcs.remove(resp.requestId);
         listener.onFailure(new RuntimeException(resp.errorString));

http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
index 074780f..f045318 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
@@ -39,7 +39,7 @@ public final class MessageDecoder extends 
MessageToMessageDecoder<ByteBuf> {
     Message.Type msgType = Message.Type.decode(in);
     Message decoded = decode(msgType, in);
     assert decoded.type() == msgType;
-    logger.trace("Received message " + msgType + ": " + decoded);
+    logger.trace("Received message {}: {}", msgType, decoded);
     out.add(decoded);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index f222337..884ea7d 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -29,7 +29,7 @@ import 
org.apache.spark.network.client.TransportResponseHandler;
 import org.apache.spark.network.protocol.Message;
 import org.apache.spark.network.protocol.RequestMessage;
 import org.apache.spark.network.protocol.ResponseMessage;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
 
 /**
  * The single Transport-level Channel handler which is used for delegating 
requests to the
@@ -76,7 +76,7 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-    logger.warn("Exception in connection from " + 
NettyUtils.getRemoteAddress(ctx.channel()),
+    logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
       cause);
     requestHandler.exceptionCaught(cause);
     responseHandler.exceptionCaught(cause);
@@ -139,7 +139,7 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
           System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > 
requestTimeoutNs;
         if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
           if (responseHandler.numOutstandingRequests() > 0) {
-            String address = NettyUtils.getRemoteAddress(ctx.channel());
+            String address = getRemoteAddress(ctx.channel());
             logger.error("Connection to {} has been quiet for {} ms while 
there are outstanding " +
               "requests. Assuming connection is dead; please adjust 
spark.network.timeout if " +
               "this is wrong.", address, requestTimeoutNs / 1000 / 1000);

http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index bebe88e..e67a034 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.network.server;
 
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 
 import com.google.common.base.Throwables;
@@ -42,7 +43,7 @@ import org.apache.spark.network.protocol.RpcResponse;
 import org.apache.spark.network.protocol.StreamFailure;
 import org.apache.spark.network.protocol.StreamRequest;
 import org.apache.spark.network.protocol.StreamResponse;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
 
 /**
  * A handler that processes requests from clients and writes chunk data back. 
Each handler is
@@ -114,9 +115,9 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
   }
 
   private void processFetchRequest(final ChunkFetchRequest req) {
-    final String client = NettyUtils.getRemoteAddress(channel);
-
-    logger.trace("Received req from {} to fetch block {}", client, 
req.streamChunkId);
+    if (logger.isTraceEnabled()) {
+      logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel), req.streamChunkId);
+    }
 
     ManagedBuffer buf;
     try {
@@ -125,7 +126,7 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
       buf = streamManager.getChunk(req.streamChunkId.streamId, 
req.streamChunkId.chunkIndex);
     } catch (Exception e) {
       logger.error(String.format(
-        "Error opening block %s for request from %s", req.streamChunkId, 
client), e);
+        "Error opening block %s for request from %s", req.streamChunkId, 
getRemoteAddress(channel)), e);
       respond(new ChunkFetchFailure(req.streamChunkId, 
Throwables.getStackTraceAsString(e)));
       return;
     }
@@ -134,13 +135,12 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
   }
 
   private void processStreamRequest(final StreamRequest req) {
-    final String client = NettyUtils.getRemoteAddress(channel);
     ManagedBuffer buf;
     try {
       buf = streamManager.openStream(req.streamId);
     } catch (Exception e) {
       logger.error(String.format(
-        "Error opening stream %s for request from %s", req.streamId, client), 
e);
+        "Error opening stream %s for request from %s", req.streamId, 
getRemoteAddress(channel)), e);
       respond(new StreamFailure(req.streamId, 
Throwables.getStackTraceAsString(e)));
       return;
     }
@@ -189,13 +189,13 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
    * it will be logged and the channel closed.
    */
   private void respond(final Encodable result) {
-    final String remoteAddress = channel.remoteAddress().toString();
+    final SocketAddress remoteAddress = channel.remoteAddress();
     channel.writeAndFlush(result).addListener(
       new ChannelFutureListener() {
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
           if (future.isSuccess()) {
-            logger.trace(String.format("Sent result %s to client %s", result, 
remoteAddress));
+            logger.trace("Sent result {} to client {}", result, remoteAddress);
           } else {
             logger.error(String.format("Error sending result %s to %s; closing 
connection",
               result, remoteAddress), future.cause());

http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index baae235..a67db4f 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -130,7 +130,7 @@ public class TransportServer implements Closeable {
     channelFuture.syncUninterruptibly();
 
     port = ((InetSocketAddress) 
channelFuture.channel().localAddress()).getPort();
-    logger.debug("Shuffle server started on port :" + port);
+    logger.debug("Shuffle server started on port: {}", port);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 54e870a..000ec13 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -244,7 +244,7 @@ public class ExternalShuffleBlockResolver {
     for (String localDir : dirs) {
       try {
         JavaUtils.deleteRecursively(new File(localDir));
-        logger.debug("Successfully cleaned up directory: " + localDir);
+        logger.debug("Successfully cleaned up directory: {}", localDir);
       } catch (Exception e) {
         logger.error("Failed to delete directory: " + localDir, e);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to