[
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631149#comment-17631149
]
ASF GitHub Bot commented on HADOOP-15327:
-----------------------------------------
K0K0V0K commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r1018076397
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -182,19 +184,29 @@ public class ShuffleHandler extends AuxiliaryService {
public static final HttpResponseStatus TOO_MANY_REQ_STATUS =
new HttpResponseStatus(429, "TOO MANY REQUESTS");
- // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
+ // This should be kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
public static final long FETCH_RETRY_DELAY = 1000L;
public static final String RETRY_AFTER_HEADER = "Retry-After";
+ static final String ENCODER_HANDLER_NAME = "encoder";
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private ServerBootstrap bootstrap;
+ private Channel ch;
+ private final ChannelGroup accepted =
+ new DefaultChannelGroup(new DefaultEventExecutorGroup(5).next());
Review Comment:
Maybe there can be a line comment why we have to create 5 event executor
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java:
##########
@@ -72,19 +73,21 @@
private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After";
protected final Reporter reporter;
- private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+ @VisibleForTesting
+ public enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
-
- private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+
+ @VisibleForTesting
+ public final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
Review Comment:
check style wont cry for public static final instead of public final static?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
- super.channelOpen(ctx, evt);
-
- if ((maxShuffleConnections > 0) && (accepted.size() >=
maxShuffleConnections)) {
+ NettyChannelHelper.channelActive(ctx.channel());
+ int numConnections = activeConnections.incrementAndGet();
+ if ((maxShuffleConnections > 0) && (numConnections >
maxShuffleConnections)) {
LOG.info(String.format("Current number of shuffle connections (%d) is
" +
- "greater than or equal to the max allowed shuffle connections
(%d)",
+ "greater than the max allowed shuffle connections (%d)",
accepted.size(), maxShuffleConnections));
- Map<String, String> headers = new HashMap<String, String>(1);
+ Map<String, String> headers = new HashMap<>(1);
// notify fetchers to backoff for a while before closing the connection
// if the shuffle connection limit is hit. Fetchers are expected to
// handle this notification gracefully, that is, not treating this as a
// fetch failure.
headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
- return;
+ } else {
+ super.channelActive(ctx);
+ accepted.add(ctx.channel());
+ LOG.debug("Added channel: {}, channel id: {}. Accepted number of
connections={}",
+ ctx.channel(), ctx.channel().id(), activeConnections.get());
}
- accepted.add(evt.getChannel());
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ NettyChannelHelper.channelInactive(ctx.channel());
+ super.channelInactive(ctx);
+ int noOfConnections = activeConnections.decrementAndGet();
+ LOG.debug("New value of Accepted number of connections={}",
noOfConnections);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- HttpRequest request = (HttpRequest) evt.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
+ Channel channel = ctx.channel();
+ LOG.trace("Executing channelRead, channel id: {}", channel.id());
+ HttpRequest request = (HttpRequest) msg;
+ LOG.debug("Received HTTP request: {}, channel id: {}", request,
channel.id());
+ if (request.method() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
}
// Check whether the shuffle version is compatible
- if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
- request.headers() != null ?
- request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null)
- || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
- request.headers() != null ?
- request.headers()
- .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) {
+ String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+ String httpHeaderName = ShuffleHeader.HTTP_HEADER_NAME;
Review Comment:
This should not be DEFAULT_HTTP_HEADER_NAME ?
(also do we need the if from the #1045 line?)
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties:
##########
@@ -17,3 +17,5 @@ log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}
(%F:%M(%L)) - %m%n
+log4j.logger.io.netty=DEBUG
+log4j.logger.org.apache.hadoop.mapred=DEBUG
Review Comment:
This wont slow down the build processes too much?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -291,36 +302,86 @@ public void operationComplete(ChannelFuture future)
throws Exception {
}
}
+ static class NettyChannelHelper {
+ static ChannelFuture writeToChannel(Channel ch, Object obj) {
+ LOG.debug("Writing {} to channel: {}", obj.getClass().getSimpleName(),
ch.id());
+ return ch.writeAndFlush(obj);
+ }
+
+ static ChannelFuture writeToChannelAndClose(Channel ch, Object obj) {
+ return writeToChannel(ch, obj).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ static ChannelFuture writeToChannelAndAddLastHttpContent(Channel ch,
HttpResponse obj) {
+ writeToChannel(ch, obj);
+ return writeLastHttpContentToChannel(ch);
+ }
+
+ static ChannelFuture writeLastHttpContentToChannel(Channel ch) {
+ LOG.debug("Writing LastHttpContent, channel id: {}", ch.id());
+ return ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ }
+
+ static ChannelFuture closeChannel(Channel ch) {
+ LOG.debug("Closing channel, channel id: {}", ch.id());
+ return ch.close();
+ }
+
+ static void closeChannels(ChannelGroup channelGroup) {
+ channelGroup.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ }
+
+ public static ChannelFuture closeAsIdle(Channel channel, int timeout) {
+ LOG.debug("Closing channel as writer was idle for {} seconds", timeout);
+ return closeChannel(channel);
+ }
+
+ public static void channelActive(Channel ch) {
+ LOG.debug("Executing channelActive, channel id: {}", ch.id());
+ }
+
+ public static void channelInactive(Channel channel) {
+ LOG.debug("Executing channelInactive, channel id: {}", channel.id());
+ }
Review Comment:
do we need public keyword here?
( if no and we change these lines maybe the channelActive can be renamed to
logChannelActive, same for inactive, and the channel parameter can be renamed
to ch. )
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -668,34 +1357,61 @@ protected ChannelFuture
sendMapOutput(ChannelHandlerContext ctx,
conns[i].connect();
}
- //Ensure first connections are okay
- conns[0].getInputStream();
- int rc = conns[0].getResponseCode();
- Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
-
- conns[1].getInputStream();
- rc = conns[1].getResponseCode();
- Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
-
- // This connection should be closed because it to above the limit
- try {
- rc = conns[2].getResponseCode();
- Assert.assertEquals("Expected a too-many-requests response code",
- ShuffleHandler.TOO_MANY_REQ_STATUS.getCode(), rc);
- long backoff = Long.valueOf(
- conns[2].getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
- Assert.assertTrue("The backoff value cannot be negative.", backoff > 0);
- conns[2].getInputStream();
- Assert.fail("Expected an IOException");
- } catch (IOException ioe) {
- LOG.info("Expected - connection should not be open");
- } catch (NumberFormatException ne) {
- Assert.fail("Expected a numerical value for RETRY_AFTER header field");
- } catch (Exception e) {
- Assert.fail("Expected a IOException");
+ Map<Integer, List<HttpURLConnection>> mapOfConnections = Maps.newHashMap();
+ for (HttpURLConnection conn : conns) {
+ try {
+ conn.getInputStream();
+ } catch (IOException ioe) {
+ LOG.info("Expected - connection should not be open");
+ } catch (NumberFormatException ne) {
+ fail("Expected a numerical value for RETRY_AFTER header field");
+ } catch (Exception e) {
+ fail("Expected a IOException");
+ }
+ int statusCode = conn.getResponseCode();
+ LOG.debug("Connection status code: {}", statusCode);
+ mapOfConnections.putIfAbsent(statusCode, new ArrayList<>());
+ List<HttpURLConnection> connectionList =
mapOfConnections.get(statusCode);
+ connectionList.add(conn);
}
+
+ assertEquals(String.format("Expected only %s and %s response",
+ OK_STATUS, ShuffleHandler.TOO_MANY_REQ_STATUS),
+ Sets.newHashSet(
+ HttpURLConnection.HTTP_OK,
+ ShuffleHandler.TOO_MANY_REQ_STATUS.code()),
+ mapOfConnections.keySet());
- shuffleHandler.stop();
+ List<HttpURLConnection> successfulConnections =
+ mapOfConnections.get(HttpURLConnection.HTTP_OK);
+ assertEquals(String.format("Expected exactly %d requests " +
+ "with %s response", maxAllowedConnections, OK_STATUS),
+ maxAllowedConnections, successfulConnections.size());
+
+ //Ensure exactly one connection is HTTP 429 (TOO MANY REQUESTS)
+ List<HttpURLConnection> closedConnections =
+ mapOfConnections.get(ShuffleHandler.TOO_MANY_REQ_STATUS.code());
+ assertEquals(String.format("Expected exactly %d %s response",
+ notAcceptedConnections, ShuffleHandler.TOO_MANY_REQ_STATUS),
+ notAcceptedConnections, closedConnections.size());
+
+ // This connection should be closed because it is above the maximum limit
+ HttpURLConnection conn = closedConnections.get(0);
+ assertEquals(String.format("Expected a %s response",
+ ShuffleHandler.TOO_MANY_REQ_STATUS),
+ ShuffleHandler.TOO_MANY_REQ_STATUS.code(), conn.getResponseCode());
+ long backoff = Long.parseLong(
+ conn.getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
+ assertTrue("The backoff value cannot be negative.", backoff > 0);
+
+ shuffleHandler.stop();
+
+ //It's okay to get a ClosedChannelException.
+ //All other kinds of exceptions means something went wrong
+ assertEquals("Should have no caught exceptions",
+ Collections.emptyList(), failures.stream()
+ .filter(f -> !(f instanceof ClosedChannelException))
+ .collect(toList()));
Review Comment:
Maybe here can be a clean up where we call the close method for every
channel, to ensure we dont allocate unused resources? Or that would be too much?
> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
> Key: HADOOP-15327
> URL: https://issues.apache.org/jira/browse/HADOOP-15327
> Project: Hadoop Common
> Issue Type: Sub-task
> Reporter: Xiaoyu Yao
> Assignee: Szilard Nemeth
> Priority: Major
> Labels: pull-request-available
> Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch,
> HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch,
> HADOOP-15327.005.patch,
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log,
> hades-results-20221108.zip, testfailure-testMapFileAccess-emptyresponse.zip,
> testfailure-testReduceFromPartialMem.zip
>
> Time Spent: 11.5h
> Remaining Estimate: 0h
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]