morningman commented on code in PR #52636:
URL: https://github.com/apache/doris/pull/52636#discussion_r2182367895
##########
fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java:
##########
@@ -76,33 +155,199 @@ public static String convertSql(String targetURL, String
originStmt, String dial
}.getType();
ConvertResponse result = new
Gson().fromJson(response.toString(), type);
if (LOG.isDebugEnabled()) {
- LOG.debug("convert response: {}", result);
+ LOG.debug("Convert response: {}, URL: {}", result,
targetURL);
}
if (result.code == 0) {
if (!"v1".equals(result.version)) {
- LOG.warn("failed to convert sql, response version
is not v1: {}", result.version);
- return originStmt;
+ LOG.warn("Failed to convert sql, response version
is not v1: {}, URL: {}",
+ result.version, targetURL);
+ return null;
}
return result.data;
} else {
- LOG.warn("failed to convert sql, response: {}",
result);
- return originStmt;
+ LOG.warn("Failed to convert sql, response: {}, URL:
{}", result, targetURL);
+ return null;
}
}
} else {
- LOG.warn("failed to convert sql, response code: {}",
responseCode);
- return originStmt;
+ throw new Exception("HTTP response code: " + responseCode);
}
- } catch (Exception e) {
- LOG.warn("failed to convert sql", e);
- return originStmt;
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
+ /**
+ * URL Manager - Responsible for URL parsing, caching, blacklist
management, and smart selection
+ */
+ private static class UrlManager {
+ private final List<String> parsedUrls;
+ private final ConcurrentHashMap<String, BlacklistEntry> blacklist;
+
+ public UrlManager(String urls) {
+ this.parsedUrls = parseUrls(urls);
+ this.blacklist = new ConcurrentHashMap<>();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created UrlManager with URLs: {}, parsed: {}",
urls, parsedUrls);
+ }
+ }
+
+ /**
+ * Parse comma separated URL strings
+ */
+ private List<String> parseUrls(String urls) {
+ List<String> result = Lists.newArrayList();
+ if (urls != null && !urls.trim().isEmpty()) {
+ String[] urlArray = urls.split(",");
+ for (String url : urlArray) {
+ String trimmedUrl = url.trim();
+ if (!trimmedUrl.isEmpty()) {
+ result.add(trimmedUrl);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Mark URL as healthy (remove from blacklist)
+ */
+ public void markUrlAsHealthy(String url) {
+ if (blacklist.remove(url) != null) {
+ LOG.info("Removed URL from blacklist due to successful
request: {}", url);
+ }
+ }
+
+ /**
+ * Add URL to blacklist
+ */
+ public void markUrlAsBlacklisted(String url) {
+ long currentTime = System.currentTimeMillis();
+ long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS;
+ BlacklistEntry existingEntry = blacklist.get(url);
+ if (existingEntry != null) {
+ // If URL is already in blacklist, limit maximum recovery time
to avoid infinite extension
+ // Maximum recovery time is 2 times the original recovery time
+ long maxRecoverTime = currentTime +
(BLACKLIST_RECOVERY_TIME_MS * 2);
+ recoverTime = Math.min(maxRecoverTime,
existingEntry.recoverTime + BLACKLIST_RECOVERY_TIME_MS);
+ }
+ blacklist.put(url, new BlacklistEntry(currentTime, recoverTime));
+ LOG.warn("Added URL to blacklist: {}, will recover at: {}", url,
new Date(recoverTime));
+ }
+
+ /**
+ * Get list of healthy URLs (not in blacklist)
+ */
+ public List<String> getHealthyUrls() {
+ List<String> healthy = Lists.newArrayList();
+ long currentTime = System.currentTimeMillis();
+ for (String url : parsedUrls) {
+ BlacklistEntry entry = blacklist.get(url);
+ if (entry == null) {
+ // URL is not in blacklist, consider it healthy
+ healthy.add(url);
+ } else if (currentTime >= entry.recoverTime) {
+ // URL has reached recovery time, remove from blacklist
and add to healthy list
+ blacklist.remove(url);
+ healthy.add(url);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("URL recovered from blacklist: {}", url);
+ }
+ }
+ }
+
+ // Randomly shuffle the order to avoid always trying from the
first URL
+ Collections.shuffle(healthy, ThreadLocalRandom.current());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Healthy URLs: {}", healthy);
+ }
+
+ return healthy;
+ }
+
+ /**
+ * Get list of blacklisted URLs (for immediate retry)
+ */
+ public List<String> getBlacklistedUrls() {
+ List<String> blacklisted = Lists.newArrayList();
+ long currentTime = System.currentTimeMillis();
+
+ for (String url : parsedUrls) {
+ BlacklistEntry entry = blacklist.get(url);
+ if (entry != null && currentTime < entry.recoverTime) {
+ // URL is in blacklist and has not reached recovery time
yet
+ blacklisted.add(url);
+ }
+ }
+
+ // Sort by recovery time, prioritize URLs that should recover
earlier
+ blacklisted.sort((url1, url2) -> {
+ BlacklistEntry entry1 = blacklist.get(url1);
+ BlacklistEntry entry2 = blacklist.get(url2);
+ return Long.compare(entry1.recoverTime, entry2.recoverTime);
+ });
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Blacklisted URLs for immediate retry: {}",
blacklisted);
+ }
+
+ return blacklisted;
+ }
+
+ /**
+ * Get ALL URLs in priority order for 100% success guarantee
+ * CRITICAL: This method ensures we try every URL when any service
might be available
+ * <p>
+ * Priority order:
+ * 1. Healthy URLs (not in blacklist or recovered) - randomly shuffled
for load balancing
+ * 2. Blacklisted URLs (sorted by recovery time) - still try them for
guaranteed coverage
+ */
+ public List<String> getAllUrlsInPriorityOrder() {
+ List<String> prioritizedUrls = Lists.newArrayList();
+
+ // First: Add all healthy URLs
+ List<String> healthyUrls = getHealthyUrls();
Review Comment:
You traverse the `parsedUrls` twice, both in `getHealthyUrls()` and in
`getBlacklistedUrls()`, which is low efficiency. I think we can only traverse
the list once.
1. first, add `127.0.01` if available.
2. add other available url.
3. add url in black list
##########
fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java:
##########
@@ -30,18 +31,95 @@
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
/**
* This class is used to convert sql with different dialects using sql
convertor service.
* The sql convertor service is a http service which is used to convert sql.
+ * <p>
+ * Features:
+ * - Support multiple URLs (comma separated)
+ * - Blacklist mechanism for failed URLs
+ * - Automatic failover and retry
+ * - URL caching and smart selection
*/
public class HttpDialectUtils {
private static final Logger LOG =
LogManager.getLogger(HttpDialectUtils.class);
- public static String convertSql(String targetURL, String originStmt,
String dialect,
+ // Cache URL manager instances to avoid duplicate parsing
+ private static final ConcurrentHashMap<String, UrlManager> urlManagerCache
= new ConcurrentHashMap<>();
+
+ // Blacklist recovery time (ms): 5 minutes
+ private static final long BLACKLIST_RECOVERY_TIME_MS = 5 * 60 * 1000;
Review Comment:
Too long, 1min is enough, because the sql convertor should be restart very
soon
##########
fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java:
##########
@@ -76,33 +155,199 @@ public static String convertSql(String targetURL, String
originStmt, String dial
}.getType();
ConvertResponse result = new
Gson().fromJson(response.toString(), type);
if (LOG.isDebugEnabled()) {
- LOG.debug("convert response: {}", result);
+ LOG.debug("Convert response: {}, URL: {}", result,
targetURL);
}
if (result.code == 0) {
if (!"v1".equals(result.version)) {
- LOG.warn("failed to convert sql, response version
is not v1: {}", result.version);
- return originStmt;
+ LOG.warn("Failed to convert sql, response version
is not v1: {}, URL: {}",
+ result.version, targetURL);
+ return null;
}
return result.data;
} else {
- LOG.warn("failed to convert sql, response: {}",
result);
- return originStmt;
+ LOG.warn("Failed to convert sql, response: {}, URL:
{}", result, targetURL);
+ return null;
}
}
} else {
- LOG.warn("failed to convert sql, response code: {}",
responseCode);
- return originStmt;
+ throw new Exception("HTTP response code: " + responseCode);
}
- } catch (Exception e) {
- LOG.warn("failed to convert sql", e);
- return originStmt;
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
+ /**
+ * URL Manager - Responsible for URL parsing, caching, blacklist
management, and smart selection
+ */
+ private static class UrlManager {
+ private final List<String> parsedUrls;
+ private final ConcurrentHashMap<String, BlacklistEntry> blacklist;
+
+ public UrlManager(String urls) {
+ this.parsedUrls = parseUrls(urls);
+ this.blacklist = new ConcurrentHashMap<>();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created UrlManager with URLs: {}, parsed: {}",
urls, parsedUrls);
+ }
+ }
+
+ /**
+ * Parse comma separated URL strings
+ */
+ private List<String> parseUrls(String urls) {
+ List<String> result = Lists.newArrayList();
+ if (urls != null && !urls.trim().isEmpty()) {
+ String[] urlArray = urls.split(",");
+ for (String url : urlArray) {
+ String trimmedUrl = url.trim();
+ if (!trimmedUrl.isEmpty()) {
+ result.add(trimmedUrl);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Mark URL as healthy (remove from blacklist)
+ */
+ public void markUrlAsHealthy(String url) {
+ if (blacklist.remove(url) != null) {
+ LOG.info("Removed URL from blacklist due to successful
request: {}", url);
+ }
+ }
+
+ /**
+ * Add URL to blacklist
+ */
+ public void markUrlAsBlacklisted(String url) {
+ long currentTime = System.currentTimeMillis();
+ long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS;
+ BlacklistEntry existingEntry = blacklist.get(url);
+ if (existingEntry != null) {
+ // If URL is already in blacklist, limit maximum recovery time
to avoid infinite extension
+ // Maximum recovery time is 2 times the original recovery time
+ long maxRecoverTime = currentTime +
(BLACKLIST_RECOVERY_TIME_MS * 2);
+ recoverTime = Math.min(maxRecoverTime,
existingEntry.recoverTime + BLACKLIST_RECOVERY_TIME_MS);
+ }
+ blacklist.put(url, new BlacklistEntry(currentTime, recoverTime));
+ LOG.warn("Added URL to blacklist: {}, will recover at: {}", url,
new Date(recoverTime));
+ }
+
+ /**
+ * Get list of healthy URLs (not in blacklist)
+ */
+ public List<String> getHealthyUrls() {
+ List<String> healthy = Lists.newArrayList();
+ long currentTime = System.currentTimeMillis();
+ for (String url : parsedUrls) {
+ BlacklistEntry entry = blacklist.get(url);
+ if (entry == null) {
+ // URL is not in blacklist, consider it healthy
+ healthy.add(url);
+ } else if (currentTime >= entry.recoverTime) {
+ // URL has reached recovery time, remove from blacklist
and add to healthy list
+ blacklist.remove(url);
+ healthy.add(url);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("URL recovered from blacklist: {}", url);
+ }
+ }
+ }
+
+ // Randomly shuffle the order to avoid always trying from the
first URL
+ Collections.shuffle(healthy, ThreadLocalRandom.current());
Review Comment:
Do not shuffle. Because actually we always want to select the local one
(with 127.0.0.1).
So the logic should be: Always try to select url with `127.0.0.1`. If it is
not available, select another one randomly.
##########
fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java:
##########
@@ -76,33 +155,199 @@ public static String convertSql(String targetURL, String
originStmt, String dial
}.getType();
ConvertResponse result = new
Gson().fromJson(response.toString(), type);
if (LOG.isDebugEnabled()) {
- LOG.debug("convert response: {}", result);
+ LOG.debug("Convert response: {}, URL: {}", result,
targetURL);
}
if (result.code == 0) {
if (!"v1".equals(result.version)) {
- LOG.warn("failed to convert sql, response version
is not v1: {}", result.version);
- return originStmt;
+ LOG.warn("Failed to convert sql, response version
is not v1: {}, URL: {}",
+ result.version, targetURL);
+ return null;
}
return result.data;
} else {
- LOG.warn("failed to convert sql, response: {}",
result);
- return originStmt;
+ LOG.warn("Failed to convert sql, response: {}, URL:
{}", result, targetURL);
+ return null;
}
}
} else {
- LOG.warn("failed to convert sql, response code: {}",
responseCode);
- return originStmt;
+ throw new Exception("HTTP response code: " + responseCode);
}
- } catch (Exception e) {
- LOG.warn("failed to convert sql", e);
- return originStmt;
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
+ /**
+ * URL Manager - Responsible for URL parsing, caching, blacklist
management, and smart selection
+ */
+ private static class UrlManager {
+ private final List<String> parsedUrls;
+ private final ConcurrentHashMap<String, BlacklistEntry> blacklist;
+
+ public UrlManager(String urls) {
+ this.parsedUrls = parseUrls(urls);
+ this.blacklist = new ConcurrentHashMap<>();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created UrlManager with URLs: {}, parsed: {}",
urls, parsedUrls);
+ }
+ }
+
+ /**
+ * Parse comma separated URL strings
+ */
+ private List<String> parseUrls(String urls) {
+ List<String> result = Lists.newArrayList();
+ if (urls != null && !urls.trim().isEmpty()) {
+ String[] urlArray = urls.split(",");
+ for (String url : urlArray) {
+ String trimmedUrl = url.trim();
+ if (!trimmedUrl.isEmpty()) {
+ result.add(trimmedUrl);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Mark URL as healthy (remove from blacklist)
+ */
+ public void markUrlAsHealthy(String url) {
+ if (blacklist.remove(url) != null) {
+ LOG.info("Removed URL from blacklist due to successful
request: {}", url);
+ }
+ }
+
+ /**
+ * Add URL to blacklist
+ */
+ public void markUrlAsBlacklisted(String url) {
+ long currentTime = System.currentTimeMillis();
+ long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS;
+ BlacklistEntry existingEntry = blacklist.get(url);
+ if (existingEntry != null) {
+ // If URL is already in blacklist, limit maximum recovery time
to avoid infinite extension
+ // Maximum recovery time is 2 times the original recovery time
Review Comment:
No need, that is too complicated. I think just return if the url is already
in black list.
##########
fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java:
##########
@@ -30,18 +31,95 @@
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
/**
* This class is used to convert sql with different dialects using sql
convertor service.
* The sql convertor service is a http service which is used to convert sql.
+ * <p>
+ * Features:
+ * - Support multiple URLs (comma separated)
+ * - Blacklist mechanism for failed URLs
+ * - Automatic failover and retry
+ * - URL caching and smart selection
*/
public class HttpDialectUtils {
private static final Logger LOG =
LogManager.getLogger(HttpDialectUtils.class);
- public static String convertSql(String targetURL, String originStmt,
String dialect,
+ // Cache URL manager instances to avoid duplicate parsing
+ private static final ConcurrentHashMap<String, UrlManager> urlManagerCache
= new ConcurrentHashMap<>();
+
+ // Blacklist recovery time (ms): 5 minutes
+ private static final long BLACKLIST_RECOVERY_TIME_MS = 5 * 60 * 1000;
+ // Connection timeout period (ms): 3 seconds
+ private static final int CONNECTION_TIMEOUT_MS = 3000;
+ // Read timeout period (ms): 10 seconds
+ private static final int READ_TIMEOUT_MS = 10000;
+
+ public static String convertSql(String targetURLs, String originStmt,
String dialect,
String[] features, String config) {
+ if (targetURLs == null || targetURLs.trim().isEmpty()) {
Review Comment:
This can be done when setting the variable. See `checker()` in `public
@interface VarAttr`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]