shengbinxu commented on issue #27899: URL: https://github.com/apache/doris/issues/27899#issuecomment-2235087253
还有一个方法: https://doris.apache.org/zh-CN/docs/2.0/data-operate/import/stream-load-manual <img width="1135" alt="image" src="https://github.com/user-attachments/assets/56d5c8ca-caea-4d46-b7b2-eac01c75f1f7"> 客户端直接请求BE。 咱把所有BE节点的地址(这里的地址可以是nginx代理的地址,然后设置IP白名单保护BE)写死到配置中,每次请求前,做一个探活,哪个地址可用就请求哪个。 flink doris connector就是这么处理的。 参考: https://github.com/apache/doris-flink-connector/blob/8ba89c4488bb7b639566b519da58c698f1e13b2f/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java#L80 ``` package org.apache.doris.flink.sink; import org.apache.flink.annotation.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.BackendV2; import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class BackendUtil { private static final Logger LOG = LoggerFactory.getLogger(BackendUtil.class); private final List<BackendV2.BackendRowV2> backends; private long pos; public BackendUtil(List<BackendV2.BackendRowV2> backends) { this.backends = backends; this.pos = 0; } public BackendUtil(String beNodes) { this.backends = initBackends(beNodes); this.pos = 0; } private List<BackendV2.BackendRowV2> initBackends(String beNodes) { List<BackendV2.BackendRowV2> backends = new ArrayList<>(); List<String> nodes = Arrays.asList(beNodes.split(",")); nodes.forEach( node -> { if (tryHttpConnection(node)) { node = node.trim(); String[] ipAndPort = node.split(":"); BackendRowV2 backendRowV2 = new BackendRowV2(); backendRowV2.setIp(ipAndPort[0]); backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1])); backendRowV2.setAlive(true); backends.add(backendRowV2); } }); return backends; } public static BackendUtil getInstance( DorisOptions dorisOptions, DorisReadOptions readOptions, Logger logger) { if (StringUtils.isNotEmpty(dorisOptions.getBenodes())) { return new BackendUtil(dorisOptions.getBenodes()); } else { return new BackendUtil(RestService.getBackendsV2(dorisOptions, readOptions, logger)); } } public String getAvailableBackend() { return getAvailableBackend(0); } public String getAvailableBackend(int subtaskId) { long tmp = pos + backends.size(); while (pos < tmp) { BackendV2.BackendRowV2 backend = backends.get((int) ((pos + subtaskId) % backends.size())); pos++; String res = backend.toBackendString(); if (tryHttpConnection(res)) { return res; } } throw new DorisRuntimeException("no available backend."); } public static boolean tryHttpConnection(String host) { try { LOG.debug("try to connect host {}", host); host = "http://" + host; URL url = new URL(host); HttpURLConnection co = (HttpURLConnection) url.openConnection(); co.setConnectTimeout(60000); co.connect(); co.disconnect(); return true; } catch (Exception ex) { LOG.warn("Failed to connect to host:{}", host, ex); return false; } } @VisibleForTesting public List<BackendRowV2> getBackends() { return backends; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org