ZanderXu commented on code in PR #8448:
URL: https://github.com/apache/hadoop/pull/8448#discussion_r3256057730
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -540,21 +569,17 @@ public void initAsyncThreadPools(Configuration
configuration) {
asyncResponderCount, new AsyncThreadFactory("Router Async Responder
#"));
}
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
-
- if (routerDefaultAsyncHandlerExecutor == null) {
- LOG.info("init router async default executor handler count: {}",
asyncHandlerCountDefault);
- routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
- asyncHandlerCountDefault, new AsyncThreadFactory("Router Async
Default Handler #"));
- }
}
- private void initNsAsyncHandlerCount() {
- String configNsHandler =
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+ private Map<String, Integer> parseNsAsyncHandlerCount(Configuration config) {
+ String configNsHandler =
config.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
+ Map<String, Integer> nsAsyncHandlerCount = new HashMap<>();
if (StringUtils.isEmpty(configNsHandler)) {
- LOG.error(
- "The value of config key: {} is empty. Will use default conf.",
+ LOG.debug("No per-namespace async handler counts configured ({}). "
Review Comment:
change the `debug` to `info` or `warn`.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -504,34 +508,59 @@ public RouterRpcServer(Configuration conf, Router router,
* @param configuration the configuration.
*/
public void initAsyncThreadPools(Configuration configuration) {
- LOG.info("Begin initialize asynchronous handler and responder thread
pool.");
- initNsAsyncHandlerCount();
Set<String> allConfiguredNS =
FederationUtil.getAllConfiguredNS(configuration);
- Set<String> unassignedNS = new HashSet<>();
allConfiguredNS.add(CONCURRENT_NS);
-
+ Map<String, Integer> nsAsyncActiveHandlerCount =
parseNsAsyncHandlerCount(configuration);
+ initAsyncHandlerThreadPools(configuration, allConfiguredNS,
nsAsyncActiveHandlerCount);
+ initAsyncResponderThreadPools(configuration);
+ }
+
+ private void initAsyncHandlerThreadPools(Configuration configuration,
+ Set<String> allConfiguredNS, Map<String, Integer> nsAsyncHandlerCount) {
+ LOG.info("Initializing asynchronous handler thread pools");
+ int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE,
+ DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT);
+ if (asyncQueueSize < 1) {
+ throw new IllegalArgumentException("Async queue size must be at least
1");
+ }
+ int asyncHandlerCountDefault =
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+ DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
for (String nsId : allConfiguredNS) {
int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
- LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
- if (dedicatedHandlers > 0) {
- initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
- LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers,
nsId);
+ if (dedicatedHandlers <= 0) {
+ dedicatedHandlers = asyncHandlerCountDefault;
+ LOG.info("Use default async handler count {} for ns {} to init
Executors.",
+ asyncHandlerCountDefault, nsId);
} else {
- unassignedNS.add(nsId);
+ LOG.info("Dedicated handlers {} for ns {} to init Executors",
dedicatedHandlers, nsId);
}
- }
-
- int asyncHandlerCountDefault =
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
- DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
- if (!unassignedNS.isEmpty()) {
- LOG.warn("Async handler unassigned ns: {}", unassignedNS);
- LOG.info("Use default async handler count {} for unassigned ns.",
asyncHandlerCountDefault);
- for (String nsId : unassignedNS) {
- initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+ if (dedicatedHandlers > 0) {
Review Comment:
this check is useless.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -504,34 +508,59 @@ public RouterRpcServer(Configuration conf, Router router,
* @param configuration the configuration.
*/
public void initAsyncThreadPools(Configuration configuration) {
- LOG.info("Begin initialize asynchronous handler and responder thread
pool.");
- initNsAsyncHandlerCount();
Set<String> allConfiguredNS =
FederationUtil.getAllConfiguredNS(configuration);
- Set<String> unassignedNS = new HashSet<>();
allConfiguredNS.add(CONCURRENT_NS);
-
+ Map<String, Integer> nsAsyncActiveHandlerCount =
parseNsAsyncHandlerCount(configuration);
+ initAsyncHandlerThreadPools(configuration, allConfiguredNS,
nsAsyncActiveHandlerCount);
+ initAsyncResponderThreadPools(configuration);
+ }
+
+ private void initAsyncHandlerThreadPools(Configuration configuration,
+ Set<String> allConfiguredNS, Map<String, Integer> nsAsyncHandlerCount) {
+ LOG.info("Initializing asynchronous handler thread pools");
+ int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE,
+ DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT);
+ if (asyncQueueSize < 1) {
+ throw new IllegalArgumentException("Async queue size must be at least
1");
+ }
+ int asyncHandlerCountDefault =
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+ DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
Review Comment:
throw IllegalArgumentException if `asyncHandlerCountDefault` <= 0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]