This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a81db3e984 [improvement](FQDN) broker support fqdn (#19821) a81db3e984 is described below commit a81db3e984f5929880e554b97c8c64464b0ab5e6 Author: zhangdong <493738...@qq.com> AuthorDate: Sat May 20 11:25:58 2023 +0800 [improvement](FQDN) broker support fqdn (#19821) 1.broker support fqdn 2.change 'master_only' attr of 'enable_fqdn_mode' --- .../src/main/java/org/apache/doris/common/Config.java | 2 +- .../java/org/apache/doris/analysis/ShowBrokerStmt.java | 4 ---- .../main/java/org/apache/doris/catalog/BrokerMgr.java | 14 ++++++-------- .../main/java/org/apache/doris/catalog/FsBroker.java | 18 +++++++++--------- .../java/org/apache/doris/common/util/BrokerUtil.java | 2 +- .../java/org/apache/doris/deploy/DeployManager.java | 2 +- .../org/apache/doris/fs/remote/BrokerFileSystem.java | 2 +- .../org/apache/doris/load/loadv2/SparkLoadJob.java | 4 ++-- .../main/java/org/apache/doris/planner/ExportSink.java | 2 +- .../apache/doris/planner/external/FileGroupInfo.java | 2 +- .../doris/planner/external/FileQueryScanNode.java | 2 +- .../src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- .../main/java/org/apache/doris/qe/ShowExecutor.java | 4 ---- .../java/org/apache/doris/system/HeartbeatMgr.java | 10 +++++----- .../main/java/org/apache/doris/task/DownloadTask.java | 2 +- .../main/java/org/apache/doris/task/UploadTask.java | 2 +- .../java/org/apache/doris/persist/FsBrokerTest.java | 4 ++-- 17 files changed, 34 insertions(+), 44 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 9922034adc..70835f0e57 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1756,7 +1756,7 @@ public class Config extends ConfigBase { * When enable_fqdn_mode is true, the name of the pod where be is located will remain unchanged * after reconstruction, while the ip can be changed. */ - @ConfField(mutable = false, masterOnly = true, expType = ExperimentalType.EXPERIMENTAL) + @ConfField(mutable = false, expType = ExperimentalType.EXPERIMENTAL) public static boolean enable_fqdn_mode = false; /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java index 9312d99e50..2ef68cd149 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java @@ -45,10 +45,6 @@ public class ShowBrokerStmt extends ShowStmt { public ShowResultSetMetaData getMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); for (String title : BrokerMgr.BROKER_PROC_NODE_TITLE_NAMES) { - if (title.equals(BrokerMgr.BROKER_PROC_NODE_TITLE_NAMES.get(BrokerMgr.HOSTNAME_INDEX))) { - // SHOW BROKER does not show hostname - continue; - } builder.addColumn(new Column(title, ScalarType.createVarchar(30))); } return builder.build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java index 28a679b9a1..39c96c358d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java @@ -27,7 +27,6 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.common.proc.ProcNodeInterface; import org.apache.doris.common.proc.ProcResult; -import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; import com.google.common.collect.ArrayListMultimap; @@ -50,7 +49,7 @@ import java.util.concurrent.locks.ReentrantLock; */ public class BrokerMgr { public static final ImmutableList<String> BROKER_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>() - .add("Name").add("IP").add("HostName").add("Port").add("Alive") + .add("Name").add("Host").add("Port").add("Alive") .add("LastStartTime").add("LastUpdateTime").add("ErrMsg") .build(); @@ -228,7 +227,7 @@ public class BrokerMgr { } Env.getCurrentEnv().getEditLog().logAddBroker(new ModifyBrokerInfo(name, addedBrokerAddress)); for (FsBroker address : addedBrokerAddress) { - brokerAddrsMap.put(address.ip, address); + brokerAddrsMap.put(address.host, address); } brokersMap.put(name, brokerAddrsMap); brokerListMap.put(name, Lists.newArrayList(brokerAddrsMap.values())); @@ -246,7 +245,7 @@ public class BrokerMgr { brokersMap.put(name, brokerAddrsMap); } for (FsBroker address : addresses) { - brokerAddrsMap.put(address.ip, address); + brokerAddrsMap.put(address.host, address); } brokerListMap.put(name, Lists.newArrayList(brokerAddrsMap.values())); @@ -280,7 +279,7 @@ public class BrokerMgr { } Env.getCurrentEnv().getEditLog().logDropBroker(new ModifyBrokerInfo(name, droppedAddressList)); for (FsBroker address : droppedAddressList) { - brokerAddrsMap.remove(address.ip, address); + brokerAddrsMap.remove(address.host, address); } brokerListMap.put(name, Lists.newArrayList(brokerAddrsMap.values())); @@ -294,7 +293,7 @@ public class BrokerMgr { try { ArrayListMultimap<String, FsBroker> brokerAddrsMap = brokersMap.get(name); for (FsBroker addr : addresses) { - brokerAddrsMap.remove(addr.ip, addr); + brokerAddrsMap.remove(addr.host, addr); } brokerListMap.put(name, Lists.newArrayList(brokerAddrsMap.values())); @@ -364,8 +363,7 @@ public class BrokerMgr { for (FsBroker broker : entry.getValue().values()) { List<String> row = Lists.newArrayList(); row.add(brokerName); - row.add(broker.ip); - row.add(NetUtils.getHostnameByIp(broker.ip)); + row.add(broker.host); row.add(String.valueOf(broker.port)); row.add(String.valueOf(broker.isAlive)); row.add(TimeUtils.longToTimeString(broker.lastStartTime)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java index 2d3a51d468..fd40606dde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java @@ -30,8 +30,8 @@ import java.io.DataOutput; import java.io.IOException; public class FsBroker implements Writable, Comparable<FsBroker> { - @SerializedName(value = "ip") - public String ip; + @SerializedName(value = "host", alternate = {"ip"}) + public String host; @SerializedName(value = "port") public int port; // msg for ping result @@ -46,8 +46,8 @@ public class FsBroker implements Writable, Comparable<FsBroker> { public FsBroker() { } - public FsBroker(String ip, int port) { - this.ip = ip; + public FsBroker(String host, int port) { + this.host = host; this.port = port; } @@ -90,19 +90,19 @@ public class FsBroker implements Writable, Comparable<FsBroker> { FsBroker other = (FsBroker) o; return port == other.port - && ip.equals(other.ip); + && host.equals(other.host); } @Override public int hashCode() { - int result = ip.hashCode(); + int result = host.hashCode(); result = 31 * result + port; return result; } @Override public int compareTo(FsBroker o) { - int ret = ip.compareTo(o.ip); + int ret = host.compareTo(o.host); if (ret != 0) { return ret; } @@ -117,13 +117,13 @@ public class FsBroker implements Writable, Comparable<FsBroker> { @Deprecated private void readFields(DataInput in) throws IOException { - ip = Text.readString(in); + host = Text.readString(in); port = in.readInt(); } @Override public String toString() { - return ip + ":" + port; + return host + ":" + port; } public static FsBroker readIn(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 0444abf443..d19693e9f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -441,7 +441,7 @@ public class BrokerUtil { } catch (AnalysisException e) { throw new UserException(e.getMessage()); } - return new TNetworkAddress(broker.ip, broker.port); + return new TNetworkAddress(broker.host, broker.port); } public static TPaloBrokerService.Client borrowClient(TNetworkAddress address) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java index 19a497adf6..73f51beb5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java @@ -604,7 +604,7 @@ public class DeployManager extends MasterDaemon { } private HostInfo convertToHostInfo(FsBroker broker) { - return new HostInfo(broker.ip, broker.port); + return new HostInfo(broker.host, broker.port); } private HostInfo convertToHostInfo(Backend backend) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java index a2eb5560a4..1373ac1d6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java @@ -88,7 +88,7 @@ public class BrokerFileSystem extends RemoteFileSystem { LOG.warn("failed to get a broker address: " + e.getMessage()); return null; } - TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port); + TNetworkAddress address = new TNetworkAddress(broker.host, broker.port); TPaloBrokerService.Client client; try { client = ClientPool.brokerPool.borrowObject(address); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 097fe51b2a..71e01c6af4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -504,11 +504,11 @@ public class SparkLoadJob extends BulkLoadJob { FsBroker fsBroker = Env.getCurrentEnv().getBrokerMgr().getBroker( brokerDesc.getName(), backend.getHost()); tBrokerScanRange.getBrokerAddresses().add( - new TNetworkAddress(fsBroker.ip, fsBroker.port)); + new TNetworkAddress(fsBroker.host, fsBroker.port)); LOG.debug("push task for replica {}, broker {}:{}," + " backendId {}, filePath {}, fileSize {}", - replicaId, fsBroker.ip, + replicaId, fsBroker.host, fsBroker.port, backendId, tBrokerRangeDesc.path, tBrokerRangeDesc.file_size); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExportSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExportSink.java index b4b86c165f..0fd2535b62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExportSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExportSink.java @@ -79,7 +79,7 @@ public class ExportSink extends DataSink { if (brokerDesc.getFileType() == TFileType.FILE_BROKER) { FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerDesc.getName()); if (broker != null) { - tExportSink.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); + tExportSink.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); } } tExportSink.setProperties(brokerDesc.getProperties()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java index 7808c35ac6..f626bb71c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java @@ -263,7 +263,7 @@ public class FileGroupInfo { } catch (AnalysisException e) { throw new UserException(e.getMessage()); } - params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); + params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); } else { params.setBrokerAddresses(new ArrayList<>()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 9c5f0f4fbf..afcef39c29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -235,7 +235,7 @@ public abstract class FileQueryScanNode extends FileScanNode { if (broker == null) { throw new UserException("No alive broker."); } - params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); + params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); } } else if (locationType == TFileType.FILE_S3) { params.setProperties(locationProperties); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c8dbd2d5d9..956c40c633 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -600,7 +600,7 @@ public class Coordinator { ResultFileSink topResultFileSink = (ResultFileSink) topDataSink; FsBroker broker = Env.getCurrentEnv().getBrokerMgr() .getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname()); - topResultFileSink.setBrokerAddr(broker.ip, broker.port); + topResultFileSink.setBrokerAddr(broker.host, broker.port); } } else { // This is a load process. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 52b6643cda..8f440cb750 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -107,7 +107,6 @@ import org.apache.doris.backup.BackupJob; import org.apache.doris.backup.Repository; import org.apache.doris.backup.RestoreJob; import org.apache.doris.blockrule.SqlBlockRule; -import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; @@ -1754,9 +1753,6 @@ public class ShowExecutor { private void handleShowBroker() { ShowBrokerStmt showStmt = (ShowBrokerStmt) stmt; List<List<String>> brokersInfo = Env.getCurrentEnv().getBrokerMgr().getBrokersInfo(); - for (List<String> row : brokersInfo) { - row.remove(BrokerMgr.HOSTNAME_INDEX); - } // Only success resultSet = new ShowResultSet(showStmt.getMetaData(), brokersInfo); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index dac2f46afc..ffd0269070 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -187,7 +187,7 @@ public class HeartbeatMgr extends MasterDaemon { boolean isChanged = broker.handleHbResponse(hbResponse); if (hbResponse.getStatus() != HbStatus.OK) { // invalid all connections cached in ClientPool - ClientPool.brokerPool.clearPool(new TNetworkAddress(broker.ip, broker.port)); + ClientPool.brokerPool.clearPool(new TNetworkAddress(broker.host, broker.port)); } return isChanged; } @@ -352,7 +352,7 @@ public class HeartbeatMgr extends MasterDaemon { @Override public HeartbeatResponse call() { TPaloBrokerService.Client client = null; - TNetworkAddress addr = new TNetworkAddress(broker.ip, broker.port); + TNetworkAddress addr = new TNetworkAddress(broker.host, broker.port); boolean ok = false; try { client = ClientPool.brokerPool.borrowObject(addr); @@ -362,13 +362,13 @@ public class HeartbeatMgr extends MasterDaemon { ok = true; if (status.getStatusCode() != TBrokerOperationStatusCode.OK) { - return new BrokerHbResponse(brokerName, broker.ip, broker.port, status.getMessage()); + return new BrokerHbResponse(brokerName, broker.host, broker.port, status.getMessage()); } else { - return new BrokerHbResponse(brokerName, broker.ip, broker.port, System.currentTimeMillis()); + return new BrokerHbResponse(brokerName, broker.host, broker.port, System.currentTimeMillis()); } } catch (Exception e) { - return new BrokerHbResponse(brokerName, broker.ip, broker.port, + return new BrokerHbResponse(brokerName, broker.host, broker.port, Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage()); } finally { if (ok) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java index 133614e964..64b75a70d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java @@ -64,7 +64,7 @@ public class DownloadTask extends AgentTask { } public TDownloadReq toThrift() { - TNetworkAddress address = new TNetworkAddress(brokerAddr.ip, brokerAddr.port); + TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port); TDownloadReq req = new TDownloadReq(jobId, srcToDestPath, address); req.setBrokerProp(brokerProperties); req.setStorageBackend(storageType.toThrift()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java index a66a6b2b48..f41d722825 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java @@ -65,7 +65,7 @@ public class UploadTask extends AgentTask { } public TUploadReq toThrift() { - TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port); + TNetworkAddress address = new TNetworkAddress(broker.host, broker.port); TUploadReq request = new TUploadReq(jobId, srcToDestPath, address); request.setBrokerProp(brokerProperties); request.setStorageBackend(storageType.toThrift()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/FsBrokerTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/FsBrokerTest.java index 32a45cbfee..a5acc45bba 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/FsBrokerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/FsBrokerTest.java @@ -71,7 +71,7 @@ public class FsBrokerTest { DataInputStream dis = new DataInputStream(new FileInputStream(file)); FsBroker readBroker = FsBroker.readIn(dis); - Assert.assertEquals(fsBroker.ip, readBroker.ip); + Assert.assertEquals(fsBroker.host, readBroker.host); Assert.assertEquals(fsBroker.port, readBroker.port); Assert.assertEquals(fsBroker.isAlive, readBroker.isAlive); Assert.assertTrue(fsBroker.isAlive); @@ -98,7 +98,7 @@ public class FsBrokerTest { DataInputStream dis = new DataInputStream(new FileInputStream(file)); FsBroker readBroker = FsBroker.readIn(dis); - Assert.assertEquals(fsBroker.ip, readBroker.ip); + Assert.assertEquals(fsBroker.host, readBroker.host); Assert.assertEquals(fsBroker.port, readBroker.port); Assert.assertEquals(fsBroker.isAlive, readBroker.isAlive); Assert.assertFalse(fsBroker.isAlive); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org