This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 8cc6c84 fix bug 8cc6c84 is described below commit 8cc6c84926261fde3c4f5266d0be84bb1f1e6dd5 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Sun Apr 25 16:52:22 2021 +0800 fix bug --- .../apache/kylin/common/util/CliCommandExecutor.java | 10 +++++++++- .../java/org/apache/kylin/common/util/ServerMode.java | 3 +-- .../apache/kylin/rest/service/StreamingV2Service.java | 4 ++++ .../coordinator/client/CoordinatorClientFactory.java | 17 ++++++++++++----- .../java/org/apache/kylin/stream/core/model/Node.java | 2 ++ 5 files changed, 28 insertions(+), 8 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java index 74ea1f9..bbb4f4a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java @@ -166,7 +166,7 @@ public class CliCommandExecutor { public static final String COMMAND_BLOCK_LIST = "[ &`>|{}()$;\\-#~!+*\\\\]+"; public static final String COMMAND_WHITE_LIST = "[^\\w%,@/:=?.\"\\[\\]]"; public static final String HIVE_BLOCK_LIST = "[ <>()$;\\-#!+*\"'/=%@]+"; - + public static final String HOST_NAME_WHITE_LIST = "[^-.a-zA-Z0-9]"; /** * <pre> @@ -201,6 +201,14 @@ public class CliCommandExecutor { return checkParameter(hiveProperty, HIVE_BLOCK_LIST); } + public static void checkHostName(String nodeName) { + String repaired = nodeName.replaceAll(HOST_NAME_WHITE_LIST, ""); + if (repaired.length() != nodeName.length()) { + throw new IllegalArgumentException("Detected illegal character in host name " + nodeName + " by " + + HOST_NAME_WHITE_LIST + ", operation not allowed."); + } + } + private static String checkParameter(String commandParameter, String rex) { String repaired = commandParameter.replaceAll(rex, ""); if (repaired.length() != commandParameter.length()) { diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java b/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java index 3dfb5cd..880b933 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java @@ -46,8 +46,7 @@ public class ServerMode { } public boolean canServeStreamingCoordinator() { - return serverModes.contains(SERVER_MODE_ALL) - || serverModes.contains(SERVER_MODE_STREAM_COORDINATOR); + return serverModes.contains(SERVER_MODE_STREAM_COORDINATOR); } public boolean canServeAll() { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java index 3ef0ee2..7cb85c6 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java @@ -437,6 +437,10 @@ public class StreamingV2Service extends BasicService { for (Node receiver : receivers) { Future<ReceiverStats> futureStats = statsFuturesMap.get(receiver); try { + if (futureStats == null) { + logger.warn("Receiver node {} can not be connect.", receiver); + continue; + } ReceiverStats receiverStats = futureStats.get(); receiverStatsMap.put(receiver, receiverStats); } catch (InterruptedException e) { diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java index 6c8fa16..6a894f5 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/client/CoordinatorClientFactory.java @@ -18,9 +18,6 @@ package org.apache.kylin.stream.coordinator.client; -import java.net.InetAddress; -import java.net.NetworkInterface; - import org.apache.kylin.common.KylinConfig; import org.apache.kylin.stream.coordinator.Coordinator; import org.apache.kylin.stream.coordinator.StreamMetadataStore; @@ -56,8 +53,18 @@ public class CoordinatorClientFactory { logger.warn("no coordinator node registered"); return false; } - InetAddress inetAddress = InetAddress.getByName(coordinatorNode.getHost()); - return NetworkInterface.getByInetAddress(inetAddress) != null; + String hostAddr = KylinConfig.getInstanceFromEnv().getServerRestAddress(); + String[] hostAddrInfo = hostAddr.split(":"); + if (hostAddrInfo.length < 2) { + logger.error("kylin.server.host-address {} is not qualified ", hostAddr); + throw new RuntimeException("kylin.server.host-address " + hostAddr + " is not qualified"); + } + String host = hostAddrInfo[0]; + int port = Integer.parseInt(hostAddrInfo[1]); + + if (!host.equals(coordinatorNode.getHost()) || port != coordinatorNode.getPort()) { + return false; + } } catch (Exception e) { logger.error("Error when check network interface.", e); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/Node.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/Node.java index b54ab12..1801655 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/Node.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/Node.java @@ -23,6 +23,7 @@ import java.util.Map; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kylin.common.util.CliCommandExecutor; @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class Node { @@ -35,6 +36,7 @@ public class Node { @JsonCreator public Node(@JsonProperty("host") String host, @JsonProperty("port") int port) { + CliCommandExecutor.checkHostName(host); this.host = host; this.port = port; }