This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 627b5ee302 [enhancement](k8s) Support fqdn mode for fe in k8s enviroment (#17329) 627b5ee302 is described below commit 627b5ee30298133542ae9394274cb48ef0324ba3 Author: yinzhijian <373141...@qq.com> AuthorDate: Sun Mar 5 10:18:56 2023 +0800 [enhancement](k8s) Support fqdn mode for fe in k8s enviroment (#17329) --- .../org/apache/doris/common/FeMetaVersion.java | 6 +- .../java/org/apache/doris/alter/SystemHandler.java | 12 +- .../org/apache/doris/analysis/FrontendClause.java | 22 +- .../main/java/org/apache/doris/catalog/Env.java | 280 +++++++++++++-------- .../doris/common/proc/FrontendsProcNode.java | 47 ++-- .../apache/doris/common/telemetry/Telemetry.java | 2 +- .../org/apache/doris/deploy/DeployManager.java | 50 ++-- .../src/main/java/org/apache/doris/ha/BDBHA.java | 17 ++ .../main/java/org/apache/doris/ha/MasterInfo.java | 40 ++- .../org/apache/doris/httpv2/meta/MetaService.java | 10 +- .../doris/httpv2/rest/manager/ClusterAction.java | 2 +- .../doris/httpv2/rest/manager/HttpUtils.java | 2 +- .../doris/httpv2/rest/manager/NodeAction.java | 17 +- .../httpv2/rest/manager/QueryProfileAction.java | 2 +- .../org/apache/doris/journal/JournalEntity.java | 7 +- .../apache/doris/journal/bdbje/BDBEnvironment.java | 1 - .../apache/doris/journal/bdbje/BDBJEJournal.java | 26 +- .../java/org/apache/doris/master/Checkpoint.java | 4 +- .../java/org/apache/doris/persist/EditLog.java | 9 + .../org/apache/doris/persist/OperationType.java | 2 + .../java/org/apache/doris/qe/StmtExecutor.java | 2 +- .../org/apache/doris/service/FrontendOptions.java | 17 +- .../apache/doris/service/FrontendServiceImpl.java | 2 +- .../java/org/apache/doris/system/FQDNManager.java | 30 ++- .../java/org/apache/doris/system/Frontend.java | 63 +++-- .../java/org/apache/doris/system/HeartbeatMgr.java | 7 +- .../org/apache/doris/system/SystemInfoService.java | 40 ++- .../org/apache/doris/system/FQDNManagerTest.java | 69 ++++- .../org/apache/doris/system/HeartbeatMgrTest.java | 4 +- 29 files changed, 563 insertions(+), 229 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index 5ebc0d14e7..6f6b6740b4 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -54,9 +54,11 @@ public final class FeMetaVersion { public static final int VERSION_116 = 116; // add user and comment to load job public static final int VERSION_117 = 117; - // note: when increment meta version, should assign the latest version to VERSION_CURRENT + // change frontend meta to json, add hostname to MasterInfo + public static final int VERSION_118 = 118; - public static final int VERSION_CURRENT = VERSION_117; + // note: when increment meta version, should assign the latest version to VERSION_CURRENT + public static final int VERSION_CURRENT = VERSION_118; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index ee128d3e9f..4c779c8bcd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -153,16 +153,20 @@ public class SystemHandler extends AlterHandler { } else if (alterClause instanceof AddObserverClause) { AddObserverClause clause = (AddObserverClause) alterClause; - Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, clause.getHost(), clause.getPort()); + Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, clause.getIp(), clause.getHostName(), + clause.getPort()); } else if (alterClause instanceof DropObserverClause) { DropObserverClause clause = (DropObserverClause) alterClause; - Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, clause.getHost(), clause.getPort()); + Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, clause.getIp(), clause.getHostName(), + clause.getPort()); } else if (alterClause instanceof AddFollowerClause) { AddFollowerClause clause = (AddFollowerClause) alterClause; - Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), clause.getPort()); + Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, clause.getIp(), clause.getHostName(), + clause.getPort()); } else if (alterClause instanceof DropFollowerClause) { DropFollowerClause clause = (DropFollowerClause) alterClause; - Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), clause.getPort()); + Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, clause.getIp(), clause.getHostName(), + clause.getPort()); } else if (alterClause instanceof ModifyBrokerClause) { ModifyBrokerClause clause = (ModifyBrokerClause) alterClause; Env.getCurrentEnv().getBrokerMgr().execute(clause); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FrontendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FrontendClause.java index 624db6a9ea..3fd8676b50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FrontendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FrontendClause.java @@ -22,11 +22,11 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.Pair; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -36,7 +36,8 @@ import java.util.Map; public class FrontendClause extends AlterClause { protected String hostPort; - protected String host; + protected String ip; + protected String hostName; protected int port; protected FrontendNodeType role; @@ -46,8 +47,12 @@ public class FrontendClause extends AlterClause { this.role = role; } - public String getHost() { - return host; + public String getIp() { + return ip; + } + + public String getHostName() { + return hostName; } public int getPort() { @@ -61,10 +66,11 @@ public class FrontendClause extends AlterClause { analyzer.getQualifiedUser()); } - Pair<String, Integer> pair = SystemInfoService.validateHostAndPort(hostPort); - this.host = pair.first; - this.port = pair.second; - Preconditions.checkState(!Strings.isNullOrEmpty(host)); + HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort, true); + this.ip = hostInfo.getIp(); + this.hostName = hostInfo.getHostName(); + this.port = hostInfo.getPort(); + Preconditions.checkState(!Strings.isNullOrEmpty(ip)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e86ab60d39..af74e0f0c4 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -220,6 +220,7 @@ import org.apache.doris.system.FQDNManager; import org.apache.doris.system.Frontend; import org.apache.doris.system.HeartbeatMgr; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.CompactionTask; @@ -351,9 +352,7 @@ public class Env { private FrontendNodeType feType; // replica and observer use this value to decide provide read service or not private long synchronizedTimeMs; - private int masterRpcPort; - private int masterHttpPort; - private String masterIp; + private MasterInfo masterInfo; private MetaIdGenerator idGenerator = new MetaIdGenerator(NEXT_ID_INIT_VALUE); @@ -366,8 +365,8 @@ public class Env { private static Env CHECKPOINT = null; private static long checkpointThreadId = -1; private Checkpoint checkpointer; - private List<Pair<String, Integer>> helperNodes = Lists.newArrayList(); - private Pair<String, Integer> selfNode = null; + private List<HostInfo> helperNodes = Lists.newArrayList(); + private HostInfo selfNode = null; // node name -> Frontend private ConcurrentHashMap<String, Frontend> frontends; @@ -576,9 +575,7 @@ public class Env { this.removedFrontends = new ConcurrentLinkedQueue<>(); this.journalObservable = new JournalObservable(); - this.masterRpcPort = 0; - this.masterHttpPort = 0; - this.masterIp = ""; + this.masterInfo = new MasterInfo(); this.systemInfo = new SystemInfoService(); this.heartbeatMgr = new HeartbeatMgr(systemInfo, !isCheckpointCatalog); @@ -926,7 +923,8 @@ public class Env { // For compatibility. Because this is the very first time to start, so we arbitrarily choose // a new name for this node role = FrontendNodeType.FOLLOWER; - nodeName = genFeNodeName(selfNode.first, selfNode.second, false /* new style */); + nodeName = genFeNodeName(Config.enable_fqdn_mode ? selfNode.getIdent() : selfNode.getIp(), + selfNode.getPort(), false /* new style */); storage.writeFrontendRoleAndNodeName(role, nodeName); LOG.info("very first time to start this node. role: {}, node name: {}", role.name(), nodeName); } else { @@ -942,24 +940,13 @@ public class Env { // But we will get a empty nodeName after upgrading. // So for forward compatibility, we use the "old-style" way of naming: "ip_port", // and update the ROLE file. - nodeName = genFeNodeName(selfNode.first, selfNode.second, true/* old style */); + nodeName = genFeNodeName(selfNode.getIp(), selfNode.getPort(), true/* old style */); storage.writeFrontendRoleAndNodeName(role, nodeName); LOG.info("forward compatibility. role: {}, node name: {}", role.name(), nodeName); - } else { - // nodeName should be like "192.168.1.1_9217_1620296111213" - // and the selfNode should be the prefix of nodeName. - // If not, it means that the ip used last time is different from this time, which is not allowed. - // But is metadata_failure_recovery is true, - // we will not check it because this may be a FE migration. - String[] split = nodeName.split("_"); - if (Config.metadata_failure_recovery.equals("false") && !selfNode.first.equalsIgnoreCase( - split[0])) { - throw new IOException( - "the self host " + selfNode.first + " does not equal to the host in ROLE" + " file " - + split[0] + ". You need to set 'priority_networks' config" - + " in fe.conf to match the host " + split[0]); - } } + // Notice: + // With the introduction of FQDN, the nodeName is no longer bound to an IP address, + // so consistency is no longer checked here. Otherwise, the startup will fail. } Preconditions.checkNotNull(role); @@ -972,7 +959,8 @@ public class Env { storage.writeClusterIdAndToken(); isFirstTimeStartUp = true; - Frontend self = new Frontend(role, nodeName, selfNode.first, selfNode.second); + Frontend self = new Frontend(role, nodeName, selfNode.getIp(), selfNode.getHostName(), + selfNode.getPort()); // We don't need to check if frontends already contains self. // frontends must be empty cause no image is loaded and no journal is replayed yet. // And this frontend will be persisted later after opening bdbje environment. @@ -1016,7 +1004,7 @@ public class Env { Preconditions.checkNotNull(role); Preconditions.checkNotNull(nodeName); - Pair<String, Integer> rightHelperNode = helperNodes.get(0); + HostInfo rightHelperNode = helperNodes.get(0); Storage storage = new Storage(this.imageDir); if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName())) @@ -1026,8 +1014,8 @@ public class Env { if (!versionFile.exists()) { // If the version file doesn't exist, download it from helper node if (!getVersionFileFromHelper(rightHelperNode)) { - throw new IOException( - "fail to download version file from " + rightHelperNode.first + " will exit."); + throw new IOException("fail to download version file from " + + rightHelperNode.getIp() + " will exit."); } // NOTE: cluster_id will be init when Storage object is constructed, @@ -1044,7 +1032,7 @@ public class Env { clusterId = storage.getClusterID(); token = storage.getToken(); try { - URL idURL = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(rightHelperNode.first, Config.http_port) + "/check"); + URL idURL = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(rightHelperNode.getIp(), Config.http_port) + "/check"); HttpURLConnection conn = null; conn = (HttpURLConnection) idURL.openConnection(); conn.setConnectTimeout(2 * 1000); @@ -1052,8 +1040,11 @@ public class Env { String clusterIdString = conn.getHeaderField(MetaBaseAction.CLUSTER_ID); int remoteClusterId = Integer.parseInt(clusterIdString); if (remoteClusterId != clusterId) { - LOG.error("cluster id is not equal with helper node {}. will exit.", rightHelperNode.first); - System.exit(-1); + LOG.error("cluster id is not equal with helper node {}. will exit.", + rightHelperNode.getIp()); + throw new IOException( + "cluster id is not equal with helper node " + + rightHelperNode.getIp() + ". will exit."); } String remoteToken = conn.getHeaderField(MetaBaseAction.TOKEN); if (token == null && remoteToken != null) { @@ -1067,7 +1058,8 @@ public class Env { Preconditions.checkNotNull(remoteToken); if (!token.equals(remoteToken)) { throw new IOException( - "token is not equal with helper node " + rightHelperNode.first + ". will exit."); + "token is not equal with helper node " + + rightHelperNode.getIp() + ". will exit."); } } } catch (Exception e) { @@ -1089,7 +1081,8 @@ public class Env { } Preconditions.checkState(helperNodes.size() == 1); - LOG.info("finished to get cluster id: {}, role: {} and node name: {}", clusterId, role.name(), nodeName); + LOG.info("finished to get cluster id: {}, isElectable: {}, role: {} and node name: {}", + clusterId, isElectable, role.name(), nodeName); } public static String genFeNodeName(String host, int port, boolean isOldStyle) { @@ -1106,12 +1099,14 @@ public class Env { private boolean getFeNodeTypeAndNameFromHelpers() { // we try to get info from helper nodes, once we get the right helper node, // other helper nodes will be ignored and removed. - Pair<String, Integer> rightHelperNode = null; - for (Pair<String, Integer> helperNode : helperNodes) { + HostInfo rightHelperNode = null; + for (HostInfo helperNode : helperNodes) { try { - URL url = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.first, Config.http_port) - + "/role?host=" + selfNode.first - + "&port=" + selfNode.second); + // For upgrade compatibility, the host parameter name remains the same + // and the new hostname parameter is added + URL url = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port) + + "/role?host=" + selfNode.getIp() + "&hostname=" + selfNode.getHostName() + + "&port=" + selfNode.getPort()); HttpURLConnection conn = null; conn = (HttpURLConnection) url.openConnection(); if (conn.getResponseCode() != 200) { @@ -1138,14 +1133,15 @@ public class Env { if (Strings.isNullOrEmpty(nodeName)) { // For forward compatibility, we use old-style name: "ip_port" - nodeName = genFeNodeName(selfNode.first, selfNode.second, true /* old style */); + nodeName = genFeNodeName(selfNode.getIp(), selfNode.getPort(), true /* old style */); } } catch (Exception e) { LOG.warn("failed to get fe node type from helper node: {}.", helperNode, e); continue; } - LOG.info("get fe node type {}, name {} from {}:{}", role, nodeName, helperNode.first, Config.http_port); + LOG.info("get fe node type {}, name {} from {}:{}:{}", role, nodeName, + helperNode.getHostName(), helperNode.getIp(), Config.http_port); rightHelperNode = helperNode; break; } @@ -1160,7 +1156,9 @@ public class Env { } private void getSelfHostPort() { - selfNode = Pair.of(FrontendOptions.getLocalHostAddress(), Config.edit_log_port); + String hostName = Strings.nullToEmpty(FrontendOptions.getHostName()); + selfNode = new HostInfo(FrontendOptions.getLocalHostAddress(), hostName, + Config.edit_log_port); LOG.debug("get self node: {}", selfNode); } @@ -1193,8 +1191,8 @@ public class Env { if (helpers != null) { String[] splittedHelpers = helpers.split(","); for (String helper : splittedHelpers) { - Pair<String, Integer> helperHostPort = SystemInfoService.validateHostAndPort(helper); - if (helperHostPort.equals(selfNode)) { + HostInfo helperHostPort = SystemInfoService.getIpHostAndPort(helper, true); + if (helperHostPort.isSame(selfNode)) { /** * If user specified the helper node to this FE itself, * we will stop the starting FE process and report an error. @@ -1211,7 +1209,7 @@ public class Env { } } else { // If helper node is not designated, use local node as helper node. - helperNodes.add(Pair.of(selfNode.first, Config.edit_log_port)); + helperNodes.add(new HostInfo(selfNode.getIp(), selfNode.getHostName(), Config.edit_log_port)); } } @@ -1234,7 +1232,8 @@ public class Env { // This is not the first time this node start up. // It should already added to FE group, just set helper node as it self. LOG.info("role file exist. this is not the first time to start up"); - helperNodes = Lists.newArrayList(Pair.of(selfNode.first, Config.edit_log_port)); + helperNodes = Lists.newArrayList(new HostInfo(selfNode.getIp(), selfNode.getHostName(), + Config.edit_log_port)); return; } @@ -1309,11 +1308,11 @@ public class Env { // MUST set master ip before starting checkpoint thread. // because checkpoint thread need this info to select non-master FE to push image - this.masterIp = FrontendOptions.getLocalHostAddress(); - this.masterRpcPort = Config.rpc_port; - this.masterHttpPort = Config.http_port; - MasterInfo info = new MasterInfo(this.masterIp, this.masterHttpPort, this.masterRpcPort); - editLog.logMasterInfo(info); + this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getIp(), + Env.getCurrentEnv().getSelfNode().getHostName(), + Config.http_port, + Config.rpc_port); + editLog.logMasterInfo(masterInfo); // for master, the 'isReady' is set behind. // but we are sure that all metadata is replayed if we get here. @@ -1461,7 +1460,7 @@ public class Env { if (Config.edit_log_type.equalsIgnoreCase("bdb")) { for (Frontend fe : frontends.values()) { if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) { - ((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort()); + ((BDBHA) getHaProtocol()).addHelperSocket(fe.getIp(), fe.getEditLogPort()); } } } @@ -1531,11 +1530,11 @@ public class Env { return; } - Frontend fe = checkFeExist(selfNode.first, selfNode.second); + Frontend fe = checkFeExist(selfNode.getIp(), selfNode.getHostName(), selfNode.getPort()); if (fe == null) { - LOG.error("current node {}:{} is not added to the cluster, will exit." + LOG.error("current node {}:{}:{} is not added to the cluster, will exit." + " Your FE IP maybe changed, please set 'priority_networks' config in fe.conf properly.", - selfNode.first, selfNode.second); + selfNode.getHostName(), selfNode.getIp(), selfNode.getPort()); System.exit(-1); } else if (fe.getRole() != role) { LOG.error("current node role is {} not match with frontend recorded role {}. will exit", role, @@ -1553,9 +1552,9 @@ public class Env { } } - private boolean getVersionFileFromHelper(Pair<String, Integer> helperNode) throws IOException { + private boolean getVersionFileFromHelper(HostInfo helperNode) throws IOException { try { - String url = "http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.first, Config.http_port) + "/version"; + String url = "http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port) + "/version"; File dir = new File(this.imageDir); MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, MetaHelper.getOutputStream(Storage.VERSION_FILE, dir)); @@ -1568,13 +1567,13 @@ public class Env { return false; } - private void getNewImage(Pair<String, Integer> helperNode) throws IOException { + private void getNewImage(HostInfo helperNode) throws IOException { long localImageVersion = 0; Storage storage = new Storage(this.imageDir); localImageVersion = storage.getLatestImageSeq(); try { - String hostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.first, Config.http_port); + String hostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port); URL infoUrl = new URL("http://" + hostPort + "/info"); StorageInfo info = getStorageInfo(infoUrl); long version = info.getImageSeq(); @@ -1597,10 +1596,11 @@ public class Env { Preconditions.checkNotNull(selfNode); Preconditions.checkNotNull(helperNodes); LOG.debug("self: {}. helpers: {}", selfNode, helperNodes); - // if helper nodes contain it self, remove other helpers + // if helper nodes contain itself, remove other helpers boolean containSelf = false; - for (Pair<String, Integer> helperNode : helperNodes) { - if (selfNode.equals(helperNode)) { + for (HostInfo helperNode : helperNodes) { + // WARN: cannot use equals() here, because the hostname may not equal to helperNode.getHostName() + if (selfNode.isSame(helperNode)) { containSelf = true; break; } @@ -1698,11 +1698,9 @@ public class Env { } public long loadMasterInfo(DataInputStream dis, long checksum) throws IOException { - masterIp = Text.readString(dis); - masterRpcPort = dis.readInt(); - long newChecksum = checksum ^ masterRpcPort; - masterHttpPort = dis.readInt(); - newChecksum ^= masterHttpPort; + masterInfo = MasterInfo.read(dis); + long newChecksum = checksum ^ masterInfo.getRpcPort(); + newChecksum ^= masterInfo.getHttpPort(); LOG.info("finished replay masterInfo from image"); return newChecksum; @@ -1985,14 +1983,9 @@ public class Env { } public long saveMasterInfo(CountingDataOutputStream dos, long checksum) throws IOException { - Text.writeString(dos, masterIp); - - checksum ^= masterRpcPort; - dos.writeInt(masterRpcPort); - - checksum ^= masterHttpPort; - dos.writeInt(masterHttpPort); - + masterInfo.write(dos); + checksum ^= masterInfo.getRpcPort(); + checksum ^= masterInfo.getHttpPort(); return checksum; } @@ -2452,58 +2445,100 @@ public class Env { }; } - public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + public void addFrontend(FrontendNodeType role, String ip, String hostname, int editLogPort) throws DdlException { if (!tryLock(false)) { throw new DdlException("Failed to acquire catalog lock. Try again"); } try { - Frontend fe = checkFeExist(host, editLogPort); + Frontend fe = checkFeExist(ip, hostname, editLogPort); if (fe != null) { throw new DdlException("frontend already exists " + fe); } - + if (Config.enable_fqdn_mode && StringUtils.isEmpty(hostname)) { + throw new DdlException("frontend's hostName should not be empty while enable_fqdn_mode is true"); + } + String host = hostname != null && Config.enable_fqdn_mode ? hostname : ip; String nodeName = genFeNodeName(host, editLogPort, false /* new name style */); if (removedFrontends.contains(nodeName)) { throw new DdlException("frontend name already exists " + nodeName + ". Try again"); } - fe = new Frontend(role, nodeName, host, editLogPort); + fe = new Frontend(role, nodeName, ip, hostname, editLogPort); frontends.put(nodeName, fe); BDBHA bdbha = (BDBHA) haProtocol; if (role == FrontendNodeType.FOLLOWER || role == FrontendNodeType.REPLICA) { - bdbha.addHelperSocket(host, editLogPort); - helperNodes.add(Pair.of(host, editLogPort)); + bdbha.addHelperSocket(ip, editLogPort); + helperNodes.add(new HostInfo(ip, hostname, editLogPort)); bdbha.addUnReadyElectableNode(nodeName, getFollowerCount()); } - bdbha.removeConflictNodeIfExist(host, editLogPort); + bdbha.removeConflictNodeIfExist(ip, editLogPort); editLog.logAddFrontend(fe); } finally { unlock(); } } - public void dropFrontend(FrontendNodeType role, String host, int port) throws DdlException { - if (host.equals(selfNode.first) && port == selfNode.second && feType == FrontendNodeType.MASTER) { + public void modifyFrontendIp(String nodeName, String destIp) throws DdlException { + modifyFrontendHost(nodeName, destIp, ""); + } + + public void modifyFrontendHostName(String nodeName, String destHostName) throws DdlException { + modifyFrontendHost(nodeName, "", destHostName); + } + + public void modifyFrontendHost(String nodeName, String destIp, String destHostName) throws DdlException { + if (!tryLock(false)) { + throw new DdlException("Failed to acquire catalog lock. Try again"); + } + try { + Frontend fe = getFeByName(nodeName); + if (fe == null) { + throw new DdlException("frontend does not exist, nodeName:" + nodeName); + } + boolean needLog = false; + // we use hostname as address of bdbha, so we don't need to update node address when ip changed + if (!Strings.isNullOrEmpty(destIp) && !destIp.equals(fe.getIp())) { + fe.setIp(destIp); + needLog = true; + } + if (!Strings.isNullOrEmpty(destHostName) && !destHostName.equals(fe.getHostName())) { + fe.setHostName(destHostName); + BDBHA bdbha = (BDBHA) haProtocol; + bdbha.updateNodeAddress(fe.getNodeName(), destHostName, fe.getEditLogPort()); + needLog = true; + } + if (needLog) { + Env.getCurrentEnv().getEditLog().logModifyFrontend(fe); + } + } finally { + unlock(); + } + } + + public void dropFrontend(FrontendNodeType role, String ip, String hostname, int port) throws DdlException { + if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER + && ((!Strings.isNullOrEmpty(selfNode.getHostName()) && selfNode.getHostName().equals(hostname)) + || ip.equals(selfNode.getIp()))) { throw new DdlException("can not drop current master node."); } if (!tryLock(false)) { throw new DdlException("Failed to acquire catalog lock. Try again"); } try { - Frontend fe = checkFeExist(host, port); + Frontend fe = checkFeExist(ip, hostname, port); if (fe == null) { - throw new DdlException("frontend does not exist[" + host + ":" + port + "]"); + throw new DdlException("frontend does not exist[" + ip + ":" + port + "]"); } if (fe.getRole() != role) { - throw new DdlException(role.toString() + " does not exist[" + host + ":" + port + "]"); + throw new DdlException(role.toString() + " does not exist[" + ip + ":" + port + "]"); } frontends.remove(fe.getNodeName()); removedFrontends.add(fe.getNodeName()); if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) { haProtocol.removeElectableNode(fe.getNodeName()); - helperNodes.remove(Pair.of(host, port)); + removeHelperNode(ip, hostname, port); BDBHA ha = (BDBHA) haProtocol; ha.removeUnReadyElectableNode(nodeName, getFollowerCount()); } @@ -2513,22 +2548,33 @@ public class Env { } } - public Frontend checkFeExist(String host, int port) { + private void removeHelperNode(String ip, String hostName, int port) { + // ip may be changed, so we need use both ip and hostname to check. + // use node.getIdent() for simplicity here. + helperNodes.removeIf(node -> (node.getIp().equals(ip) + || node.getIdent().equals(hostName)) && node.getPort() == port); + } + + public Frontend checkFeExist(String ip, String hostName, int port) { for (Frontend fe : frontends.values()) { - if (fe.getHost().equals(host) && fe.getEditLogPort() == port) { + if (fe.getEditLogPort() != port) { + continue; + } + if (fe.getIp().equals(ip) + || (!Strings.isNullOrEmpty(fe.getHostName()) && fe.getHostName().equals(hostName))) { return fe; } } return null; } - public Frontend getFeByHost(String host) { + public Frontend getFeByIp(String ip) { for (Frontend fe : frontends.values()) { InetAddress hostAddr = null; InetAddress feAddr = null; try { - hostAddr = InetAddress.getByName(host); - feAddr = InetAddress.getByName(fe.getHost()); + hostAddr = InetAddress.getByName(ip); + feAddr = InetAddress.getByName(fe.getIp()); } catch (UnknownHostException e) { LOG.warn("get address failed: {}", e.getMessage()); return null; @@ -3209,7 +3255,7 @@ public class Env { public void replayAddFrontend(Frontend fe) { tryLock(true); try { - Frontend existFe = checkFeExist(fe.getHost(), fe.getEditLogPort()); + Frontend existFe = checkFeExist(fe.getIp(), fe.getHostName(), fe.getEditLogPort()); if (existFe != null) { LOG.warn("fe {} already exist.", existFe); if (existFe.getRole() != fe.getRole()) { @@ -3232,13 +3278,34 @@ public class Env { // DO NOT add helper sockets here, cause BDBHA is not instantiated yet. // helper sockets will be added after start BDBHA // But add to helperNodes, just for show - helperNodes.add(Pair.of(fe.getHost(), fe.getEditLogPort())); + helperNodes.add(new HostInfo(fe.getIp(), fe.getHostName(), fe.getEditLogPort())); } } finally { unlock(); } } + public void replayModifyFrontend(Frontend fe) { + tryLock(true); + try { + Frontend existFe = getFeByName(fe.getNodeName()); + if (existFe == null) { + // frontend may already be dropped. this may happen when + // drop and modify operations do not guarantee the order. + return; + } + // modify fe in frontends + existFe.setIp(fe.getIp()); + existFe.setHostName(fe.getHostName()); + // modify fe in helperNodes + helperNodes.stream().filter(n -> !Strings.isNullOrEmpty(n.getHostName()) + && n.getHostName().equals(fe.getHostName())) + .forEach(n -> n.ip = fe.getIp()); + } finally { + unlock(); + } + } + public void replayDropFrontend(Frontend frontend) { tryLock(true); try { @@ -3248,7 +3315,7 @@ public class Env { return; } if (removedFe.getRole() == FrontendNodeType.FOLLOWER || removedFe.getRole() == FrontendNodeType.REPLICA) { - helperNodes.remove(Pair.of(removedFe.getHost(), removedFe.getEditLogPort())); + removeHelperNode(removedFe.getIp(), removedFe.getHostName(), removedFe.getEditLogPort()); } removedFrontends.add(removedFe.getNodeName()); @@ -3505,16 +3572,16 @@ public class Env { return this.role; } - public Pair<String, Integer> getHelperNode() { + public HostInfo getHelperNode() { Preconditions.checkState(helperNodes.size() >= 1); return this.helperNodes.get(0); } - public List<Pair<String, Integer>> getHelperNodes() { + public List<HostInfo> getHelperNodes() { return Lists.newArrayList(helperNodes); } - public Pair<String, Integer> getSelfNode() { + public HostInfo getSelfNode() { return this.selfNode; } @@ -3530,21 +3597,28 @@ public class Env { if (!isReady()) { return 0; } - return this.masterRpcPort; + return this.masterInfo.getRpcPort(); } public int getMasterHttpPort() { if (!isReady()) { return 0; } - return this.masterHttpPort; + return this.masterInfo.getHttpPort(); } public String getMasterIp() { if (!isReady()) { return ""; } - return this.masterIp; + return this.masterInfo.getIp(); + } + + public String getMasterHostName() { + if (!isReady()) { + return ""; + } + return this.masterInfo.getHostName(); } public EsRepository getEsRepository() { @@ -3556,9 +3630,7 @@ public class Env { } public void setMaster(MasterInfo info) { - this.masterIp = info.getIp(); - this.masterHttpPort = info.getHttpPort(); - this.masterRpcPort = info.getRpcPort(); + this.masterInfo = info; } public boolean canRead() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index 362722d10f..8a83d50beb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -24,6 +24,7 @@ import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Frontend; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -31,6 +32,7 @@ import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -85,12 +87,11 @@ public class FrontendsProcNode implements ProcNodeInterface { // get all node which are joined in bdb group List<InetSocketAddress> allFe = env.getHaProtocol().getElectableNodes(true /* include leader */); allFe.addAll(env.getHaProtocol().getObserverNodes()); - List<Pair<String, Integer>> allFeHosts = convertToHostPortPair(allFe); - List<Pair<String, Integer>> helperNodes = env.getHelperNodes(); + List<HostInfo> helperNodes = env.getHelperNodes(); // Because the `show frontend` stmt maybe forwarded from other FE. // if we only get self node from currrent catalog, the "CurrentConnected" field will always points to Msater FE. - String selfNode = Env.getCurrentEnv().getSelfNode().first; + String selfNode = Env.getCurrentEnv().getSelfNode().getIp(); if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) { selfNode = ConnectContext.get().getCurrentConnectedFEIp(); } @@ -99,13 +100,13 @@ public class FrontendsProcNode implements ProcNodeInterface { List<String> info = new ArrayList<String>(); info.add(fe.getNodeName()); - info.add(fe.getHost()); + info.add(fe.getIp()); - info.add(NetUtils.getHostnameByIp(fe.getHost())); + info.add(NetUtils.getHostnameByIp(fe.getIp())); info.add(Integer.toString(fe.getEditLogPort())); info.add(Integer.toString(Config.http_port)); - if (fe.getHost().equals(env.getSelfNode().first)) { + if (fe.getIp().equals(env.getSelfNode().getIp())) { info.add(Integer.toString(Config.query_port)); info.add(Integer.toString(Config.rpc_port)); } else { @@ -114,15 +115,15 @@ public class FrontendsProcNode implements ProcNodeInterface { } info.add(fe.getRole().name()); - InetSocketAddress socketAddress = new InetSocketAddress(fe.getHost(), fe.getEditLogPort()); + InetSocketAddress socketAddress = new InetSocketAddress(fe.getIp(), fe.getEditLogPort()); //An ipv6 address may have different format, so we compare InetSocketAddress objects instead of IP Strings. //e.g. fdbd:ff1:ce00:1c26::d8 and fdbd:ff1:ce00:1c26:0:0:d8 info.add(String.valueOf(socketAddress.equals(master))); info.add(Integer.toString(env.getClusterId())); - info.add(String.valueOf(isJoin(allFeHosts, fe))); + info.add(String.valueOf(isJoin(allFe, fe))); - if (fe.getHost().equals(env.getSelfNode().first)) { + if (fe.getIp().equals(env.getSelfNode().getIp())) { info.add("true"); info.add(Long.toString(env.getEditLog().getMaxJournalId())); } else { @@ -134,19 +135,35 @@ public class FrontendsProcNode implements ProcNodeInterface { info.add(fe.getHeartbeatErrMsg()); info.add(fe.getVersion()); // To indicate which FE we currently connected - info.add(fe.getHost().equals(selfNode) ? "Yes" : "No"); + info.add(fe.getIp().equals(selfNode) ? "Yes" : "No"); infos.add(info); } } - private static boolean isHelperNode(List<Pair<String, Integer>> helperNodes, Frontend fe) { - return helperNodes.stream().anyMatch(p -> p.first.equals(fe.getHost()) && p.second == fe.getEditLogPort()); + private static boolean isHelperNode(List<HostInfo> helperNodes, Frontend fe) { + return helperNodes.stream().anyMatch(p -> p.getIp().equals(fe.getIp()) && p.getPort() == fe.getEditLogPort()); } - private static boolean isJoin(List<Pair<String, Integer>> allFeHosts, Frontend fe) { - for (Pair<String, Integer> pair : allFeHosts) { - if (fe.getHost().equals(pair.first) && fe.getEditLogPort() == pair.second) { + private static boolean isJoin(List<InetSocketAddress> allFeHosts, Frontend fe) { + for (InetSocketAddress addr : allFeHosts) { + if (fe.getEditLogPort() != addr.getPort()) { + continue; + } + if (!Strings.isNullOrEmpty(addr.getHostName())) { + if (addr.getHostName().equals(fe.getHostName()) + || addr.getHostName().equals(fe.getIp())) { + return true; + } + } + // if hostname of InetSocketAddress is ip, addr.getHostName() may be not equal to fe.getIp() + // so we need to compare fe.getIp() with address.getHostAddress() + InetAddress address = addr.getAddress(); + if (null == address) { + LOG.warn("Failed to get InetAddress {}", addr); + continue; + } + if (fe.getIp().equals(address.getHostAddress())) { return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/telemetry/Telemetry.java b/fe/fe-core/src/main/java/org/apache/doris/common/telemetry/Telemetry.java index 887405f325..46d115c593 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/telemetry/Telemetry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/telemetry/Telemetry.java @@ -70,7 +70,7 @@ public class Telemetry { throw new Exception("unknown value " + Config.trace_exporter + " of trace_exporter in fe.conf"); } - String serviceName = "FRONTEND:" + Env.getCurrentEnv().getSelfNode().first; + String serviceName = "FRONTEND:" + Env.getCurrentEnv().getSelfNode().getIp(); Resource serviceNameResource = Resource.create( Attributes.of(AttributeKey.stringKey("service.name"), serviceName)); // Send a batch of spans if ScheduleDelay time or MaxExportBatchSize is reached diff --git a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java index bb25702357..e052654f8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java @@ -19,6 +19,7 @@ package org.apache.doris.deploy; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; @@ -28,6 +29,7 @@ import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -134,7 +136,8 @@ public class DeployManager extends MasterDaemon { // So we use this map to count the continuous detected down times, if the continuous down time is more // then MAX_MISSING_TIME, we considered this node as down permanently. protected Map<String, Integer> counterMap = Maps.newHashMap(); - protected static final Integer MAX_MISSING_TIME = 5; + // k8s pod delete and will recreate, so we need to wait for a while,otherwise we will drop node by mistake + protected static final Integer MAX_MISSING_TIME = 60; public DeployManager(Env env, long intervalMs) { super("deployManager", intervalMs); @@ -236,28 +239,20 @@ public class DeployManager extends MasterDaemon { throw new NotImplementedException(); } - public List<Pair<String, Integer>> getHelperNodes() { + public List<HostInfo> getHelperNodes() { String existFeHosts = System.getenv(ENV_FE_EXIST_ENDPOINT); if (!Strings.isNullOrEmpty(existFeHosts)) { // Some Frontends already exist in service group. // We consider them as helper node - List<Pair<String, Integer>> helperNodes = Lists.newArrayList(); + List<HostInfo> helperNodes = Lists.newArrayList(); String[] splittedHosts = existFeHosts.split(","); for (String host : splittedHosts) { - String[] splittedHostPort = host.split(":"); - if (splittedHostPort.length != 2) { - LOG.error("Invalid exist fe hosts: {}. will exit", existFeHosts); - System.exit(-1); - } - Integer port = -1; try { - port = Integer.valueOf(splittedHostPort[1]); - } catch (NumberFormatException e) { + helperNodes.add(SystemInfoService.getIpHostAndPort(host, true)); + } catch (AnalysisException e) { LOG.error("Invalid exist fe hosts: {}. will exit", existFeHosts); System.exit(-1); } - - helperNodes.add(Pair.of(splittedHostPort[0], port)); } return helperNodes; @@ -327,8 +322,8 @@ public class DeployManager extends MasterDaemon { LOG.info("sorted fe host list: {}", feHostInfos); // 4. return the first one as helper - return Lists.newArrayList(Pair.of(feHostInfos.get(0).getIp(), feHostInfos.get(0).getPort())); - + return Lists.newArrayList(new HostInfo(feHostInfos.get(0).getIp(), feHostInfos.get(0).getHostName(), + feHostInfos.get(0).getPort())); } @Override @@ -358,9 +353,8 @@ public class DeployManager extends MasterDaemon { } // 1.1 Check if self is in electable fe service group - // TODO(zd): 2023/2/17 Need to modify here when fe support FQDN (hostname will set to real hostname) SystemInfoService.HostInfo selfHostInfo = getFromHostInfos(remoteElectableFeHosts, - new SystemInfoService.HostInfo(env.getMasterIp(), null, Config.edit_log_port)); + new SystemInfoService.HostInfo(env.getMasterIp(), env.getMasterHostName(), Config.edit_log_port)); if (selfHostInfo == null) { // The running of this deploy manager means this node is considered self as Master. // If it self does not exist in electable fe service group, it should shut it self down. @@ -586,10 +580,10 @@ public class DeployManager extends MasterDaemon { try { switch (nodeType) { case ELECTABLE: - env.dropFrontend(FrontendNodeType.FOLLOWER, localIp, localPort); + env.dropFrontend(FrontendNodeType.FOLLOWER, localIp, localHostName, localPort); break; case OBSERVER: - env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localPort); + env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localHostName, localPort); break; case BACKEND: case BACKEND_CN: @@ -621,10 +615,10 @@ public class DeployManager extends MasterDaemon { try { switch (nodeType) { case ELECTABLE: - env.addFrontend(FrontendNodeType.FOLLOWER, remoteIp, remotePort); + env.addFrontend(FrontendNodeType.FOLLOWER, remoteIp, remoteHostName, remotePort); break; case OBSERVER: - env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remotePort); + env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remoteHostName, remotePort); break; case BACKEND: case BACKEND_CN: @@ -691,19 +685,23 @@ public class DeployManager extends MasterDaemon { return hostPortPair; } - // TODO: Need to modify here when fe support FQDN (hostname will set to real hostname) private SystemInfoService.HostInfo convertToHostInfo(Frontend frontend) { - return new SystemInfoService.HostInfo(frontend.getHost(), null, frontend.getEditLogPort()); + return new SystemInfoService.HostInfo(frontend.getIp(), frontend.getHostName(), frontend.getEditLogPort()); } private SystemInfoService.HostInfo convertToHostInfo(Backend backend) { return new SystemInfoService.HostInfo(backend.getIp(), backend.getHostName(), backend.getHeartbeatPort()); } - // TODO: Need to modify here when fe support FQDN(will check hostname?) private boolean isSelf(SystemInfoService.HostInfo hostInfo) { - if (env.getMasterIp().equals(hostInfo.getIp()) && Config.edit_log_port == hostInfo.getPort()) { - return true; + if (Config.edit_log_port == hostInfo.getPort()) { + // master host name may not same as local host name, so we should compare ip here + if (env.getMasterHostName() != null && env.getMasterHostName().equals(hostInfo.getHostName())) { + return true; + } + if (env.getMasterIp().equals(hostInfo.getIp())) { + return true; + } } return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java index 53b0f6758f..bb4d6020c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java @@ -221,6 +221,23 @@ public class BDBHA implements HAProtocol { return true; } + public boolean updateNodeAddress(String nodeName, String newHostName, int port) { + ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); + if (replicationGroupAdmin == null) { + return false; + } + try { + replicationGroupAdmin.updateAddress(nodeName, newHostName, port); + } catch (MemberNotFoundException e) { + LOG.error("the updating electable node is not found {}", nodeName, e); + return false; + } catch (MasterStateException e) { + LOG.error("the updating electable node is master {}", nodeName, e); + return false; + } + return true; + } + // When new Follower FE is added to the cluster, it should also be added to the // helper sockets in // ReplicationGroupAdmin, in order to fix the following case: diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/MasterInfo.java b/fe/fe-core/src/main/java/org/apache/doris/ha/MasterInfo.java index 343731f036..624d733b2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ha/MasterInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ha/MasterInfo.java @@ -17,8 +17,13 @@ package org.apache.doris.ha; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; @@ -26,18 +31,25 @@ import java.io.IOException; public class MasterInfo implements Writable { + @SerializedName("ip") private String ip; + @SerializedName("hostName") + private String hostName; + @SerializedName("httpPort") private int httpPort; + @SerializedName("rpcPort") private int rpcPort; public MasterInfo() { this.ip = ""; + this.hostName = ""; this.httpPort = 0; this.rpcPort = 0; } - public MasterInfo(String ip, int httpPort, int rpcPort) { + public MasterInfo(String ip, String hostName, int httpPort, int rpcPort) { this.ip = ip; + this.hostName = hostName; this.httpPort = httpPort; this.rpcPort = rpcPort; } @@ -50,6 +62,14 @@ public class MasterInfo implements Writable { this.ip = ip; } + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + public int getHttpPort() { return this.httpPort; } @@ -68,15 +88,25 @@ public class MasterInfo implements Writable { @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, ip); - out.writeInt(httpPort); - out.writeInt(rpcPort); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } - public void readFields(DataInput in) throws IOException { + @Deprecated + private void readFields(DataInput in) throws IOException { ip = Text.readString(in); httpPort = in.readInt(); rpcPort = in.readInt(); } + public static MasterInfo read(DataInput in) throws IOException { + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_118) { + MasterInfo masterInfo = new MasterInfo(); + masterInfo.readFields(in); + return masterInfo; + } + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, MasterInfo.class); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/MetaService.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/MetaService.java index 15dab7d2e5..0c79a28d4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/MetaService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/MetaService.java @@ -54,13 +54,14 @@ public class MetaService extends RestBaseController { private static final String VERSION = "version"; private static final String HOST = "host"; + private static final String HOSTNAME = "hostname"; private static final String PORT = "port"; private File imageDir = MetaHelper.getMasterImageDir(); private boolean isFromValidFe(HttpServletRequest request) { String clientHost = request.getRemoteHost(); - Frontend fe = Env.getCurrentEnv().getFeByHost(clientHost); + Frontend fe = Env.getCurrentEnv().getFeByIp(clientHost); if (fe == null) { LOG.warn("request is not from valid FE. client: {}", clientHost); return false; @@ -184,12 +185,15 @@ public class MetaService extends RestBaseController { @RequestMapping(path = "/role", method = RequestMethod.GET) public Object role(HttpServletRequest request, HttpServletResponse response) throws DdlException { checkFromValidFe(request); - + // For upgrade compatibility, the host parameter name remains the same + // and the new hostname parameter is added. + // host = ip String host = request.getParameter(HOST); + String hostname = request.getParameter(HOSTNAME); String portString = request.getParameter(PORT); if (!Strings.isNullOrEmpty(host) && !Strings.isNullOrEmpty(portString)) { int port = Integer.parseInt(portString); - Frontend fe = Env.getCurrentEnv().checkFeExist(host, port); + Frontend fe = Env.getCurrentEnv().checkFeExist(host, hostname, port); if (fe == null) { response.setHeader("role", FrontendNodeType.UNKNOWN.name()); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java index 983bafc852..48f6f02da8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java @@ -60,7 +60,7 @@ public class ClusterAction extends RestBaseController { Map<String, List<String>> result = Maps.newHashMap(); List<String> frontends = Env.getCurrentEnv().getFrontends(null) .stream().filter(Frontend::isAlive) - .map(Frontend::getHost) + .map(Frontend::getIp) .collect(Collectors.toList()); result.put("mysql", frontends.stream().map(ip -> ip + ":" + Config.query_port).collect(Collectors.toList())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java index dc238bfcec..b2bda47bd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java @@ -49,7 +49,7 @@ public class HttpUtils { static List<Pair<String, Integer>> getFeList() { return Env.getCurrentEnv().getFrontends(null) - .stream().filter(Frontend::isAlive).map(fe -> Pair.of(fe.getHost(), Config.http_port)) + .stream().filter(Frontend::isAlive).map(fe -> Pair.of(fe.getIp(), Config.http_port)) .collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java index 6694dc9998..dc87292a22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.ConfigBase; -import org.apache.doris.common.DdlException; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.ThreadPoolManager; @@ -230,7 +229,7 @@ public class NodeAction extends RestBaseController { } private static List<String> getFeList() { - return Env.getCurrentEnv().getFrontends(null).stream().map(fe -> fe.getHost() + ":" + Config.http_port) + return Env.getCurrentEnv().getFrontends(null).stream().map(fe -> fe.getIp() + ":" + Config.http_port) .collect(Collectors.toList()); } @@ -483,7 +482,7 @@ public class NodeAction extends RestBaseController { List<Map<String, String>> failedTotal = Lists.newArrayList(); List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody, failedTotal); List<Pair<String, Integer>> aliveFe = Env.getCurrentEnv().getFrontends(null).stream().filter(Frontend::isAlive) - .map(fe -> Pair.of(fe.getHost(), Config.http_port)).collect(Collectors.toList()); + .map(fe -> Pair.of(fe.getIp(), Config.http_port)).collect(Collectors.toList()); checkNodeIsAlive(nodeConfigList, aliveFe, failedTotal); Map<String, String> header = Maps.newHashMap(); @@ -646,9 +645,6 @@ public class NodeAction extends RestBaseController { } try { String role = reqInfo.getRole(); - String[] split = reqInfo.getHostPort().split(":"); - String host = split[0]; - int port = Integer.parseInt(split[1]); Env currentEnv = Env.getCurrentEnv(); FrontendNodeType frontendNodeType; if (FrontendNodeType.FOLLOWER.name().equals(role)) { @@ -656,13 +652,14 @@ public class NodeAction extends RestBaseController { } else { frontendNodeType = FrontendNodeType.OBSERVER; } + HostInfo info = SystemInfoService.getIpHostAndPort(reqInfo.getHostPort(), true); if ("ADD".equals(action)) { - currentEnv.addFrontend(frontendNodeType, host, port); + currentEnv.addFrontend(frontendNodeType, info.getIp(), info.getHostName(), info.getPort()); } else if ("DROP".equals(action)) { - currentEnv.dropFrontend(frontendNodeType, host, port); + currentEnv.dropFrontend(frontendNodeType, info.getIp(), info.getHostName(), info.getPort()); } - } catch (DdlException ddlException) { - return ResponseEntityBuilder.okWithCommonError(ddlException.getMessage()); + } catch (UserException userException) { + return ResponseEntityBuilder.okWithCommonError(userException.getMessage()); } return ResponseEntityBuilder.ok(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java index 862b56fe21..f0181e7a92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java @@ -175,7 +175,7 @@ public class QueryProfileAction extends RestBaseController { // add node information for (List<String> query : queries) { - query.add(1, Env.getCurrentEnv().getSelfNode().first + ":" + Config.http_port); + query.add(1, Env.getCurrentEnv().getSelfNode().getIp() + ":" + Config.http_port); } if (!Strings.isNullOrEmpty(search)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 2cadc0efe1..839071e241 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -337,9 +337,9 @@ public class JournalEntity implements Writable { } case OperationType.OP_ADD_FRONTEND: case OperationType.OP_ADD_FIRST_FRONTEND: + case OperationType.OP_MODIFY_FRONTEND: case OperationType.OP_REMOVE_FRONTEND: { - data = new Frontend(); - ((Frontend) data).readFields(in); + data = Frontend.read(in); isRead = true; break; } @@ -375,8 +375,7 @@ public class JournalEntity implements Writable { break; } case OperationType.OP_MASTER_INFO_CHANGE: { - data = new MasterInfo(); - ((MasterInfo) data).readFields(in); + data = MasterInfo.read(in); isRead = true; break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java index 8ee4a21d88..a656b4196d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java @@ -183,7 +183,6 @@ public class BDBEnvironment { // start state change listener StateChangeListener listener = new BDBStateChangeListener(); replicatedEnvironment.setStateChangeListener(listener); - // open epochDB. the first parameter null means auto-commit epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index 38746bb4a7..44cb54d2da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -18,7 +18,7 @@ package org.apache.doris.journal.bdbje; import org.apache.doris.catalog.Env; -import org.apache.doris.common.Pair; +import org.apache.doris.common.Config; import org.apache.doris.common.io.DataOutputBuffer; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; @@ -27,6 +27,7 @@ import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.OperationType; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.je.Database; @@ -80,9 +81,17 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B */ private void initBDBEnv(String nodeName) { environmentPath = Env.getServingEnv().getBdbDir(); - Pair<String, Integer> selfNode = Env.getServingEnv().getSelfNode(); + HostInfo selfNode = Env.getServingEnv().getSelfNode(); selfNodeName = nodeName; - selfNodeHostPort = selfNode.first + ":" + selfNode.second; + if (Config.enable_fqdn_mode) { + // We use the hostname as the address of the bdbje node, + // so that we do not need to update bdbje when the IP changes. + // WARNING:However, it is necessary to ensure that the hostname of the node + // can be resolved and accessed by other nodes. + selfNodeHostPort = selfNode.getHostName() + ":" + selfNode.getPort(); + } else { + selfNodeHostPort = selfNode.getIp() + ":" + selfNode.getPort(); + } } /* @@ -299,8 +308,11 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B if (bdbEnvironment == null) { File dbEnv = new File(environmentPath); bdbEnvironment = new BDBEnvironment(); - Pair<String, Integer> helperNode = Env.getServingEnv().getHelperNode(); - String helperHostPort = helperNode.first + ":" + helperNode.second; + HostInfo helperNode = Env.getServingEnv().getHelperNode(); + String helperHostPort = helperNode.getIp() + ":" + helperNode.getPort(); + if (Config.enable_fqdn_mode) { + helperHostPort = helperNode.getHostName() + ":" + helperNode.getPort(); + } try { bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort, Env.getServingEnv().isElectable()); @@ -358,14 +370,14 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B // the files // ATTN: here we use `getServingEnv()`, because only serving catalog has // helper nodes. - Pair<String, Integer> helperNode = Env.getServingEnv().getHelperNode(); + HostInfo helperNode = Env.getServingEnv().getHelperNode(); NetworkRestore restore = new NetworkRestore(); NetworkRestoreConfig config = new NetworkRestoreConfig(); config.setRetainLogFiles(false); restore.execute(insufficientLogEx, config); bdbEnvironment.close(); bdbEnvironment.setup(new File(environmentPath), selfNodeName, selfNodeHostPort, - helperNode.first + ":" + helperNode.second, Env.getServingEnv().isElectable()); + helperNode.getIp() + ":" + helperNode.getPort(), Env.getServingEnv().isElectable()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java index 6b68a270f3..36f8a53938 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java @@ -185,7 +185,7 @@ public class Checkpoint extends MasterDaemon { if (!allFrontends.isEmpty()) { otherNodesCount = allFrontends.size() - 1; // skip master itself for (Frontend fe : allFrontends) { - String host = fe.getHost(); + String host = fe.getIp(); if (host.equals(Env.getServingEnv().getMasterIp())) { // skip master itself continue; @@ -227,7 +227,7 @@ public class Checkpoint extends MasterDaemon { long deleteVersion = storage.getLatestValidatedImageSeq(); if (successPushed > 0) { for (Frontend fe : allFrontends) { - String host = fe.getHost(); + String host = fe.getIp(); if (host.equals(Env.getServingEnv().getMasterIp())) { // skip master itself continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 8d4d0212cd..7c27274530 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -402,6 +402,11 @@ public class EditLog { env.replayAddFrontend(fe); break; } + case OperationType.OP_MODIFY_FRONTEND: { + Frontend fe = (Frontend) journal.getData(); + env.replayModifyFrontend(fe); + break; + } case OperationType.OP_REMOVE_FRONTEND: { Frontend fe = (Frontend) journal.getData(); env.replayDropFrontend(fe); @@ -1236,6 +1241,10 @@ public class EditLog { logEdit(OperationType.OP_ADD_FIRST_FRONTEND, fe); } + public void logModifyFrontend(Frontend fe) { + logEdit(OperationType.OP_MODIFY_FRONTEND, fe); + } + public void logRemoveFrontend(Frontend fe) { logEdit(OperationType.OP_REMOVE_FRONTEND, fe); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index a8dfb59078..102270a007 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -150,6 +150,8 @@ public class OperationType { public static final short OP_DROP_REPOSITORY = 90; public static final short OP_MODIFY_BACKEND = 91; + public static final short OP_MODIFY_FRONTEND = 92; + //colocate table public static final short OP_COLOCATE_ADD_TABLE = 94; public static final short OP_COLOCATE_REMOVE_TABLE = 95; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 73f7bcca55..39a7d05b32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -497,7 +497,7 @@ public class StmtExecutor implements ProfileWriter { // If goes here, which means we can't find a valid Master FE(some error happens). // To avoid endless forward, throw exception here. throw new UserException("The statement has been forwarded to master FE(" - + Env.getCurrentEnv().getSelfNode().first + ") and failed to execute" + + Env.getCurrentEnv().getSelfNode().getIp() + ") and failed to execute" + " because Master FE is not ready. You may need to check FE's status"); } forwardToMaster(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendOptions.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendOptions.java index 67459f6eca..8a6f83cd83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendOptions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendOptions.java @@ -83,6 +83,8 @@ public class FrontendOptions { if (localAddr == null) { localAddr = loopBack; } + + checkHostName(); LOG.info("local address: {}.", localAddr); } @@ -94,10 +96,23 @@ public class FrontendOptions { return InetAddresses.toAddrString(localAddr); } - public static String getHostname() { + public static String getHostName() { return localAddr.getHostName(); } + private static void checkHostName() throws UnknownHostException { + if (Config.enable_fqdn_mode) { + if (getHostName().equals(getLocalHostAddress())) { + LOG.error("Can't get hostname in FQDN mode. Please check your network configuration." + + " got hostname: {}, ip: {}", + getHostName(), getLocalHostAddress()); + throw new UnknownHostException("Can't get hostname in FQDN mode." + + " Please check your network configuration." + + " got hostname: " + getHostName() + ", ip: " + getLocalHostAddress()); + } + } + } + private static void analyzePriorityCidrs() { String priorCidrs = Config.priority_networks; if (Strings.isNullOrEmpty(priorCidrs)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 773bae3f7f..53fba90a2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -786,7 +786,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { public TMasterOpResult forward(TMasterOpRequest params) throws TException { TNetworkAddress clientAddr = getClientAddr(); if (clientAddr != null) { - Frontend fe = Env.getCurrentEnv().getFeByHost(clientAddr.getHostname()); + Frontend fe = Env.getCurrentEnv().getFeByIp(clientAddr.getHostname()); if (fe == null) { LOG.warn("reject request from invalid host. client: {}", clientAddr); throw new TException("request from invalid host was rejected."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java b/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java index 8f24f65de6..2f60d3b2b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java @@ -19,10 +19,12 @@ package org.apache.doris.system; import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; +import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.thrift.TNetworkAddress; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -40,10 +42,36 @@ public class FQDNManager extends MasterDaemon { } /** - * At each round: check if ip of be has already been changed + * At each round: check if ip of be or fe has already been changed */ @Override protected void runAfterCatalogReady() { + updateBeIp(); + updateFeIp(); + } + + private void updateFeIp() { + for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) { + if (!Strings.isNullOrEmpty(fe.getHostName())) { + try { + InetAddress inetAddress = InetAddress.getByName(fe.getHostName()); + if (!fe.getIp().equalsIgnoreCase(inetAddress.getHostAddress())) { + String oldIp = fe.getIp(); + String newIp = inetAddress.getHostAddress(); + Env.getCurrentEnv().modifyFrontendIp(fe.getNodeName(), newIp); + LOG.warn("ip for {} of fe has been changed from {} to {}", + fe.getHostName(), oldIp, fe.getIp()); + } + } catch (UnknownHostException e) { + LOG.warn("unknown host name for fe, {}", fe.getHostName(), e); + } catch (DdlException e) { + LOG.warn("fail to update ip for fe, {}", fe.getHostName(), e); + } + } + } + } + + private void updateBeIp() { for (Backend be : nodeMgr.getIdToBackend().values()) { if (be.getHostName() != null) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java index 88448e49c6..d1a7c55ae5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java @@ -19,20 +19,32 @@ package org.apache.doris.system; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.FrontendNodeType; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.system.HeartbeatResponse.HbStatus; +import com.google.gson.annotations.SerializedName; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class Frontend implements Writable { + @SerializedName("role") private FrontendNodeType role; + // nodeName = ip:port_timestamp + @SerializedName("nodeName") private String nodeName; - private String host; + @SerializedName("ip") + private volatile String ip; + // used for getIpByHostname + @SerializedName("hostName") + private String hostName; + @SerializedName("editLogPort") private int editLogPort; private String version; @@ -47,10 +59,15 @@ public class Frontend implements Writable { public Frontend() {} - public Frontend(FrontendNodeType role, String nodeName, String host, int editLogPort) { + public Frontend(FrontendNodeType role, String nodeName, String ip, int editLogPort) { + this(role, nodeName, ip, "", editLogPort); + } + + public Frontend(FrontendNodeType role, String nodeName, String ip, String hostName, int editLogPort) { this.role = role; this.nodeName = nodeName; - this.host = host; + this.ip = ip; + this.hostName = hostName; this.editLogPort = editLogPort; } @@ -58,14 +75,22 @@ public class Frontend implements Writable { return this.role; } - public String getHost() { - return this.host; + public String getIp() { + return this.ip; } public String getVersion() { return version; } + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + public String getNodeName() { return nodeName; } @@ -131,35 +156,43 @@ public class Frontend implements Writable { @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, role.name()); - Text.writeString(out, host); - out.writeInt(editLogPort); - Text.writeString(out, nodeName); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } - public void readFields(DataInput in) throws IOException { + @Deprecated + private void readFields(DataInput in) throws IOException { role = FrontendNodeType.valueOf(Text.readString(in)); if (role == FrontendNodeType.REPLICA) { // this is for compatibility. // we changed REPLICA to FOLLOWER role = FrontendNodeType.FOLLOWER; } - host = Text.readString(in); + ip = Text.readString(in); editLogPort = in.readInt(); nodeName = Text.readString(in); } public static Frontend read(DataInput in) throws IOException { - Frontend frontend = new Frontend(); - frontend.readFields(in); - return frontend; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_118) { + Frontend frontend = new Frontend(); + frontend.readFields(in); + return frontend; + } + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, Frontend.class); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("name: ").append(nodeName).append(", role: ").append(role.name()); - sb.append(", ").append(host).append(":").append(editLogPort); + sb.append(", hostname: ").append(hostName); + sb.append(", ").append(ip).append(":").append(editLogPort); return sb.toString(); } + + public void setIp(String ip) { + this.ip = ip; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index ef0fc059cb..d339699521 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -29,6 +29,7 @@ import org.apache.doris.persist.HbPackage; import org.apache.doris.resource.Tag; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.HeartbeatResponse.HbStatus; +import org.apache.doris.system.SystemInfoService.HostInfo; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.HeartbeatService; import org.apache.doris.thrift.TBackendInfo; @@ -293,7 +294,9 @@ public class HeartbeatMgr extends MasterDaemon { @Override public HeartbeatResponse call() { - if (fe.getHost().equals(Env.getCurrentEnv().getSelfNode().first)) { + HostInfo selfNode = Env.getCurrentEnv().getSelfNode(); + if (fe.getIp().equals(selfNode.getIp()) + || (!Strings.isNullOrEmpty(fe.getHostName()) && fe.getHostName().equals(selfNode.getHostName()))) { // heartbeat to self if (Env.getCurrentEnv().isReady()) { return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port, @@ -309,7 +312,7 @@ public class HeartbeatMgr extends MasterDaemon { private HeartbeatResponse getHeartbeatResponse() { FrontendService.Client client = null; - TNetworkAddress addr = new TNetworkAddress(fe.getHost(), Config.rpc_port); + TNetworkAddress addr = new TNetworkAddress(fe.getIp(), Config.rpc_port); boolean ok = false; try { client = ClientPool.frontendHeartbeatPool.borrowObject(addr); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index c1048adbc8..abb08dcb96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -61,6 +61,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -127,6 +128,33 @@ public class SystemInfoService { return res; } + public boolean isSame(HostInfo other) { + if (other.getPort() != port) { + return false; + } + if (hostName != null && hostName.equals(other.getHostName())) { + return true; + } + if (ip != null && ip.equals(other.getIp())) { + return true; + } + return false; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HostInfo that = (HostInfo) o; + return Objects.equals(ip, that.getIp()) + && Objects.equals(hostName, that.getHostName()) + && Objects.equals(port, that.getPort()); + } + @Override public String toString() { return "HostInfo{" @@ -168,8 +196,8 @@ public class SystemInfoService { if (!Config.enable_fqdn_mode) { hostInfo.setHostName(null); } - if (Config.enable_fqdn_mode && hostInfo.getHostName() == null) { - throw new DdlException("backend's hostName should not be null while enable_fqdn_mode is true"); + if (Config.enable_fqdn_mode && StringUtils.isEmpty(hostInfo.getHostName())) { + throw new DdlException("backend's hostName should not be empty while enable_fqdn_mode is true"); } // check is already exist if (getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort()) != null) { @@ -355,20 +383,20 @@ public class SystemInfoService { return null; } - public Backend getBackendWithBePort(String host, int bePort) { + public Backend getBackendWithBePort(String ip, int bePort) { ImmutableMap<Long, Backend> idToBackend = idToBackendRef; for (Backend backend : idToBackend.values()) { - if (backend.getIp().equals(host) && backend.getBePort() == bePort) { + if (backend.getIp().equals(ip) && backend.getBePort() == bePort) { return backend; } } return null; } - public Backend getBackendWithHttpPort(String host, int httpPort) { + public Backend getBackendWithHttpPort(String ip, int httpPort) { ImmutableMap<Long, Backend> idToBackend = idToBackendRef; for (Backend backend : idToBackend.values()) { - if (backend.getIp().equals(host) && backend.getHttpPort() == httpPort) { + if (backend.getIp().equals(ip) && backend.getHttpPort() == httpPort) { return backend; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/FQDNManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/FQDNManagerTest.java index 3103edf21f..88647c45aa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/FQDNManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/FQDNManagerTest.java @@ -19,6 +19,8 @@ package org.apache.doris.system; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.ha.BDBHA; +import org.apache.doris.ha.FrontendNodeType; import mockit.Expectations; import mockit.Mock; @@ -30,6 +32,8 @@ import org.junit.Test; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; public class FQDNManagerTest { @Mocked @@ -42,6 +46,11 @@ public class FQDNManagerTest { private SystemInfoService systemInfoService; + List<Frontend> frontends = new ArrayList<>(); + + @Mocked + private BDBHA bdbha; + @Before public void setUp() throws UnknownHostException { new MockUp<InetAddress>(InetAddress.class) { @@ -56,8 +65,41 @@ public class FQDNManagerTest { public Env getServingEnv() { return env; } + + @Mock + public Env getCurrentEnv() { + return env; + } + + @Mock + public boolean tryLock() { + return true; + } + + @Mock + public void modifyFrontendIp(String nodeName, String ip) { + for (Frontend fe : frontends) { + if (fe.getNodeName().equals(nodeName)) { + fe.setIp(ip); + } + } + } + }; + + new MockUp<BDBHA>(BDBHA.class) { + @Mock + public boolean updateNodeAddress(String nodeName, String ip, int port) { + return true; + } }; + Config.enable_fqdn_mode = true; + systemInfoService = new SystemInfoService(); + systemInfoService.addBackend(new Backend(1, "193.88.67.98", "doris.test.domain", 9090)); + frontends.add(new Frontend(FrontendNodeType.FOLLOWER, "doris.test.domain_9010_1675168383846", + "193.88.67.90", "doris.test.domain", 9010)); + fdqnManager = new FQDNManager(systemInfoService); + new Expectations() { { env.isReady(); @@ -67,13 +109,20 @@ public class FQDNManagerTest { inetAddress.getHostAddress(); minTimes = 0; result = "193.88.67.99"; + + env.getFrontends(null); + minTimes = 0; + result = frontends; + + env.getFeByName("doris.test.domain_9010_1675168383846"); + minTimes = 0; + result = frontends.get(0); + + env.getHaProtocol(); + minTimes = 0; + result = bdbha; } }; - - Config.enable_fqdn_mode = true; - systemInfoService = new SystemInfoService(); - systemInfoService.addBackend(new Backend(1, "193.88.67.98", "doris.test.domain", 9090)); - fdqnManager = new FQDNManager(systemInfoService); } @Test @@ -84,4 +133,14 @@ public class FQDNManagerTest { Assert.assertEquals("193.88.67.99", systemInfoService.getBackend(1).getIp()); fdqnManager.exit(); } + + @Test + public void testFrontendChanged() throws InterruptedException { + // frontend ip changed + Assert.assertEquals("193.88.67.90", env.getFrontends(null).get(0).getIp()); + fdqnManager.start(); + Thread.sleep(1000); + Assert.assertEquals("193.88.67.99", env.getFrontends(null).get(0).getIp()); + fdqnManager.exit(); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java index 943cf462bb..e9108d2c6b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java @@ -20,11 +20,11 @@ package org.apache.doris.system; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.GenericPool; -import org.apache.doris.common.Pair; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.HeartbeatMgr.BrokerHeartbeatHandler; import org.apache.doris.system.HeartbeatMgr.FrontendHeartbeatHandler; import org.apache.doris.system.HeartbeatResponse.HbStatus; +import org.apache.doris.system.SystemInfoService.HostInfo; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; @@ -55,7 +55,7 @@ public class HeartbeatMgrTest { { env.getSelfNode(); minTimes = 0; - result = Pair.of("192.168.1.3", 9010); // not self + result = new HostInfo("192.168.1.3", null, 9010); // not self env.isReady(); minTimes = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org