This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 4b76b3e7f2c branch-3.1: [Enhancement](sql-dialect) Support multiple
sql-converter service urls #52636 (#53027)
4b76b3e7f2c is described below
commit 4b76b3e7f2c8873a66549edaa9ad44563412a84c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jul 10 16:03:09 2025 +0800
branch-3.1: [Enhancement](sql-dialect) Support multiple sql-converter
service urls #52636 (#53027)
Cherry-picked from #52636
Co-authored-by: zy-kkk <[email protected]>
---
.../doris/plugin/dialect/HttpDialectUtils.java | 268 ++++++++++++++-
.../apache/doris/plugin/HttpDialectUtilsTest.java | 360 ++++++++++++++++++++-
2 files changed, 599 insertions(+), 29 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
index 89acd66658d..5131cf82bf6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
@@ -17,6 +17,9 @@
package org.apache.doris.plugin.dialect;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.Data;
@@ -30,35 +33,113 @@ import java.lang.reflect.Type;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
/**
* This class is used to convert sql with different dialects using sql
convertor service.
* The sql convertor service is a http service which is used to convert sql.
+ * <p>
+ * Features:
+ * - Support multiple URLs (comma separated)
+ * - Blacklist mechanism for failed URLs
+ * - Automatic failover and retry
+ * - URL caching and smart selection
*/
public class HttpDialectUtils {
private static final Logger LOG =
LogManager.getLogger(HttpDialectUtils.class);
- public static String convertSql(String targetURL, String originStmt,
String dialect,
+ // Cache URL manager instances to avoid duplicate parsing with automatic
expiration
+ private static final Cache<String, UrlManager> urlManagerCache =
Caffeine.newBuilder()
+ .maximumSize(10)
+ .expireAfterAccess(8, TimeUnit.HOURS)
+ .build();
+
+ // Blacklist recovery time (ms): 1 minute
+ private static final long BLACKLIST_RECOVERY_TIME_MS = 60 * 1000;
+ // Connection timeout period (ms): 3 seconds
+ private static final int CONNECTION_TIMEOUT_MS = 3000;
+ // Read timeout period (ms): 10 seconds
+ private static final int READ_TIMEOUT_MS = 10000;
+
+ public static String convertSql(String targetURLs, String originStmt,
String dialect,
String[] features, String config) {
+ UrlManager urlManager = getOrCreateUrlManager(targetURLs);
ConvertRequest convertRequest = new ConvertRequest(originStmt,
dialect, features, config);
+ String requestStr = convertRequest.toJson();
+
+ // Try to convert SQL using intelligent URL selection strategy
+ return tryConvertWithIntelligentSelection(urlManager, requestStr,
originStmt);
+ }
+
+ /**
+ * Try to convert SQL using intelligent URL selection strategy
+ * CRITICAL: This method ensures 100% success rate when ANY service is
available
+ */
+ private static String tryConvertWithIntelligentSelection(
+ UrlManager urlManager, String requestStr, String originStmt) {
+ // Strategy: Try ALL URLs in intelligent order, regardless of
blacklist status
+ // This ensures 100% success rate when any service is actually
available
+ List<String> allUrls = urlManager.getAllUrlsInPriorityOrder();
+
+ for (String url : allUrls) {
+ try {
+ String result = doConvertSql(url, requestStr);
+ // If no exception thrown, HTTP response was successful (200)
+ // Mark URL as healthy and return result (even if empty)
+ urlManager.markUrlAsHealthy(url);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully converted SQL using URL: {}", url);
+ }
+ return result;
+ } catch (Exception e) {
+ LOG.warn("Failed to convert SQL using URL: {}, error: {}",
url, e.getMessage());
+ // Add failed URL to blacklist for future optimization
+ urlManager.markUrlAsBlacklisted(url);
+ // Continue trying next URL - this is CRITICAL for 100%
success rate
+ }
+ }
+
+ return originStmt;
+ }
+
+ /**
+ * Get or create a URL manager
+ */
+ private static UrlManager getOrCreateUrlManager(String targetURLs) {
+ return urlManagerCache.get(targetURLs, UrlManager::new);
+ }
+
+ /**
+ * Perform SQL conversion for individual URL
+ */
+ private static String doConvertSql(String targetURL, String requestStr)
throws Exception {
HttpURLConnection connection = null;
try {
- URL url = new URL(targetURL);
+ if (targetURL == null || targetURL.trim().isEmpty()) {
+ throw new Exception("Target URL is null or empty");
+ }
+ URL url = new URL(targetURL.trim());
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setUseCaches(false);
connection.setDoOutput(true);
+ connection.setConnectTimeout(CONNECTION_TIMEOUT_MS);
+ connection.setReadTimeout(READ_TIMEOUT_MS);
- String requestStr = convertRequest.toJson();
try (OutputStream outputStream = connection.getOutputStream()) {
outputStream.write(requestStr.getBytes(StandardCharsets.UTF_8));
}
int responseCode = connection.getResponseCode();
if (LOG.isDebugEnabled()) {
- LOG.debug("POST Response Code: {}, post data: {}",
responseCode, requestStr);
+ LOG.debug("POST Response Code: {}, URL: {}, post data: {}",
responseCode, targetURL, requestStr);
}
if (responseCode == HttpURLConnection.HTTP_OK) {
@@ -76,26 +157,20 @@ public class HttpDialectUtils {
}.getType();
ConvertResponse result = new
Gson().fromJson(response.toString(), type);
if (LOG.isDebugEnabled()) {
- LOG.debug("convert response: {}", result);
+ LOG.debug("Convert response: {}, URL: {}", result,
targetURL);
}
if (result.code == 0) {
if (!"v1".equals(result.version)) {
- LOG.warn("failed to convert sql, response version
is not v1: {}", result.version);
- return originStmt;
+ throw new Exception("Unsupported version: " +
result.version);
}
return result.data;
} else {
- LOG.warn("failed to convert sql, response: {}",
result);
- return originStmt;
+ throw new Exception("Conversion failed: " +
result.message);
}
}
} else {
- LOG.warn("failed to convert sql, response code: {}",
responseCode);
- return originStmt;
+ throw new Exception("HTTP response code: " + responseCode);
}
- } catch (Exception e) {
- LOG.warn("failed to convert sql", e);
- return originStmt;
} finally {
if (connection != null) {
connection.disconnect();
@@ -103,6 +178,171 @@ public class HttpDialectUtils {
}
}
+ /**
+ * URL Manager - Responsible for URL parsing, caching, blacklist
management, and smart selection
+ */
+ private static class UrlManager {
+ private final List<String> parsedUrls;
+ private final ConcurrentHashMap<String, BlacklistEntry> blacklist;
+
+ public UrlManager(String urls) {
+ this.parsedUrls = parseUrls(urls);
+ this.blacklist = new ConcurrentHashMap<>();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created UrlManager with URLs: {}, parsed: {}",
urls, parsedUrls);
+ }
+ }
+
+ /**
+ * Parse comma separated URL strings
+ */
+ private List<String> parseUrls(String urls) {
+ List<String> result = Lists.newArrayList();
+ if (urls != null && !urls.trim().isEmpty()) {
+ String[] urlArray = urls.split(",");
+ for (String url : urlArray) {
+ String trimmedUrl = url.trim();
+ if (!trimmedUrl.isEmpty()) {
+ result.add(trimmedUrl);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Mark URL as healthy (remove from blacklist)
+ */
+ public void markUrlAsHealthy(String url) {
+ if (blacklist.remove(url) != null) {
+ LOG.info("Removed URL from blacklist due to successful
request: {}", url);
+ }
+ }
+
+ /**
+ * Add URL to blacklist
+ */
+ public void markUrlAsBlacklisted(String url) {
+ // If URL is already in blacklist, just return
+ if (blacklist.containsKey(url)) {
+ return;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS;
+ blacklist.put(url, new BlacklistEntry(currentTime, recoverTime));
+ LOG.warn("Added URL to blacklist: {}, will recover at: {}", url,
new Date(recoverTime));
+ }
+
+ /**
+ * Check if URL is localhost (127.0.0.1 or localhost)
+ */
+ private boolean isLocalhost(String url) {
+ return url.contains("127.0.0.1") || url.contains("localhost");
+ }
+
+ /**
+ * Get ALL URLs in priority order for 100% success guarantee
+ * CRITICAL: This method ensures we try every URL when any service
might be available
+ * <p>
+ * Priority order:
+ * 1. Localhost URLs (127.0.0.1 or localhost) that are healthy
+ * 2. Other healthy URLs (randomly selected)
+ * 3. Localhost URLs in blacklist
+ * 4. Other blacklisted URLs (sorted by recovery time)
+ */
+ public List<String> getAllUrlsInPriorityOrder() {
+ List<String> prioritizedUrls = Lists.newArrayList();
+ List<String> healthyLocalhost = Lists.newArrayList();
+ List<String> healthyOthers = Lists.newArrayList();
+ List<String> blacklistedLocalhost = Lists.newArrayList();
+ List<String> blacklistedOthers = Lists.newArrayList();
+
+ long currentTime = System.currentTimeMillis();
+
+ // Single traversal to categorize all URLs
+ for (String url : parsedUrls) {
+ BlacklistEntry entry = blacklist.get(url);
+ boolean isHealthy = false;
+
+ if (entry == null) {
+ // URL is not in blacklist, consider it healthy
+ isHealthy = true;
+ } else if (currentTime >= entry.recoverTime) {
+ // URL has reached recovery time, remove from blacklist
and consider healthy
+ blacklist.remove(url);
+ isHealthy = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("URL recovered from blacklist: {}", url);
+ }
+ }
+
+ boolean isLocal = isLocalhost(url);
+
+ if (isHealthy) {
+ if (isLocal) {
+ healthyLocalhost.add(url);
+ } else {
+ healthyOthers.add(url);
+ }
+ } else {
+ if (isLocal) {
+ blacklistedLocalhost.add(url);
+ } else {
+ blacklistedOthers.add(url);
+ }
+ }
+ }
+
+ // Add URLs in priority order
+ // 1. Healthy localhost URLs first
+ prioritizedUrls.addAll(healthyLocalhost);
+
+ // 2. Other healthy URLs (randomly shuffled for load balancing)
+ Collections.shuffle(healthyOthers, ThreadLocalRandom.current());
+ prioritizedUrls.addAll(healthyOthers);
+
+ // 3. Blacklisted localhost URLs
+ prioritizedUrls.addAll(blacklistedLocalhost);
+
+ // 4. Other blacklisted URLs (sorted by recovery time)
+ blacklistedOthers.sort((url1, url2) -> {
+ BlacklistEntry entry1 = blacklist.get(url1);
+ BlacklistEntry entry2 = blacklist.get(url2);
+ if (entry1 == null && entry2 == null) {
+ return 0;
+ }
+ if (entry1 == null) {
+ return -1;
+ }
+ if (entry2 == null) {
+ return 1;
+ }
+ return Long.compare(entry1.recoverTime, entry2.recoverTime);
+ });
+ prioritizedUrls.addAll(blacklistedOthers);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All URLs in priority order: {}", prioritizedUrls);
+ }
+
+ return prioritizedUrls;
+ }
+ }
+
+ /**
+ * Blacklist entry
+ */
+ private static class BlacklistEntry {
+ final long blacklistedTime;
+ final long recoverTime;
+
+ BlacklistEntry(long blacklistedTime, long recoverTime) {
+ this.blacklistedTime = blacklistedTime;
+ this.recoverTime = recoverTime;
+ }
+ }
+
@Data
private static class ConvertRequest {
private String version; // CHECKSTYLE IGNORE THIS LINE
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
index de359f79475..4ce71e196a3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
@@ -28,49 +28,379 @@ import java.io.IOException;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
public class HttpDialectUtilsTest {
- private int port;
- private SimpleHttpServer server;
+ private List<Integer> ports = new ArrayList<>();
+ private List<SimpleHttpServer> servers = new ArrayList<>();
@Before
public void setUp() throws Exception {
- port = findValidPort();
- server = new SimpleHttpServer(port);
- server.start("/api/v1/convert");
+ // Create three test servers
+ for (int i = 0; i < 3; i++) {
+ int port = findValidPort();
+ ports.add(port);
+ SimpleHttpServer server = new SimpleHttpServer(port);
+ server.start("/api/v1/convert");
+ servers.add(server);
+ }
}
@After
public void tearDown() {
- if (server != null) {
- server.stop();
+ for (SimpleHttpServer server : servers) {
+ if (server != null) {
+ server.stop();
+ }
}
+ servers.clear();
+ ports.clear();
}
@Test
- public void testSqlConvert() {
+ public void testSingleUrlConvert() {
String originSql = "select * from t1 where \"k1\" = 1";
String expectedSql = "select * from t1 where `k1` = 1";
String[] features = new String[] {"ctas"};
- String targetURL = "http://127.0.0.1:" + port + "/api/v1/convert";
+ String targetURL = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert";
+
+ // Test with no response (should return original SQL)
String res = HttpDialectUtils.convertSql(targetURL, originSql,
"presto", features, "{}");
Assert.assertEquals(originSql, res);
- // test presto
- server.setResponse("{\"version\": \"v1\", \"data\": \"" + expectedSql
+ "\", \"code\": 0, \"message\": \"\"}");
+
+ // Test successful conversion
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
res = HttpDialectUtils.convertSql(targetURL, originSql, "presto",
features, "{}");
Assert.assertEquals(expectedSql, res);
- // test response version error
- server.setResponse("{\"version\": \"v2\", \"data\": \"" + expectedSql
+ "\", \"code\": 0, \"message\": \"\"}");
+
+ // Test version error
+ servers.get(0).setResponse(
+ "{\"version\": \"v2\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
res = HttpDialectUtils.convertSql(targetURL, originSql, "presto",
features, "{}");
Assert.assertEquals(originSql, res);
- // test response code error
- server.setResponse(
+
+ // Test code error
+ servers.get(0).setResponse(
"{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 400, \"message\": \"\"}");
res = HttpDialectUtils.convertSql(targetURL, originSql, "presto",
features, "{}");
Assert.assertEquals(originSql, res);
}
+ @Test
+ public void testMultipleUrlsConvert() {
+ String originSql = "select * from t1 where \"k1\" = 1";
+ String expectedSql = "select * from t1 where `k1` = 1";
+ String[] features = new String[] {"ctas"};
+
+ String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert";
+
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+
+ String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
+ Assert.assertEquals(expectedSql, res);
+ }
+
+ @Test
+ public void testFailoverMechanism() throws InterruptedException {
+ String originSql = "select * from t1 where \"k1\" = 1";
+ String expectedSql = "select * from t1 where `k1` = 1";
+ String[] features = new String[] {"ctas"};
+
+ String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
+
+ // First server returns error, second server succeeds
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 500, \"message\": \"error\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+
+ String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
+ Assert.assertEquals(expectedSql, res);
+ }
+
+ @Test
+ public void testBlacklistMechanism() throws InterruptedException {
+ String originSql = "select * from t1 where \"k1\" = 1";
+ String expectedSql = "select * from t1 where `k1` = 1";
+ String[] features = new String[] {"ctas"};
+
+ String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
+
+ // Stop first server, set second server to work
+ servers.get(0).stop();
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+
+ // First call should succeed via second server
+ String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
+ Assert.assertEquals(expectedSql, res);
+
+ // Restart first server
+ servers.set(0, new SimpleHttpServer(ports.get(0)));
+ try {
+ servers.get(0).start("/api/v1/convert");
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+ } catch (IOException e) {
+ return; // Skip test if port is occupied
+ }
+
+ // Should still work with blacklist recovery
+ res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto",
features, "{}");
+ Assert.assertEquals(expectedSql, res);
+ }
+
+ @Test
+ public void testAllUrlsFailure() {
+ String originSql = "select * from t1 where \"k1\" = 1";
+ String[] features = new String[] {"ctas"};
+
+ String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
+
+ // All servers return error
+ servers.get(0).setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
+ servers.get(1).setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
+
+ String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
+ Assert.assertEquals(originSql, res);
+ }
+
+ @Test
+ public void testUrlParsing() {
+ String originSql = "select * from t1 where \"k1\" = 1";
+ String expectedSql = "select * from t1 where `k1` = 1";
+ String[] features = new String[] {"ctas"};
+
+ // Test URL parsing with spaces and empty items
+ String targetURLs = " http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert , ,"
+ + " http://127.0.0.1:" + ports.get(1) + "/api/v1/convert ";
+
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+
+ String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
+ Assert.assertEquals(expectedSql, res);
+ }
+
+ @Test
+ public void testSeamlessFailover() throws IOException {
+ String originSql = "select * from t1 where \"k1\" = 1";
+ String expectedSql = "select * from t1 where `k1` = 1";
+ String[] features = new String[] {"ctas"};
+
+ String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
+
+ // Both servers start healthy
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+
+ String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
+ Assert.assertEquals(expectedSql, res);
+
+ // Stop first server
+ servers.get(0).stop();
+ res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto",
features, "{}");
+ Assert.assertEquals(expectedSql, res);
+
+ // Restart first server, stop second
+ servers.set(0, new SimpleHttpServer(ports.get(0)));
+ servers.get(0).start("/api/v1/convert");
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+ servers.get(1).stop();
+
+ // Should seamlessly switch to first server
+ res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto",
features, "{}");
+ Assert.assertEquals(expectedSql, res);
+ }
+
+ @Test
+ public void testConcurrentRequests() throws InterruptedException {
+ String originSql = "select * from t1 where \"k1\" = 1";
+ String expectedSql = "select * from t1 where `k1` = 1";
+ String[] features = new String[] {"ctas"};
+
+ String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
+
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+
+ // Test with multiple concurrent threads
+ Thread[] threads = new Thread[10];
+ String[] results = new String[10];
+
+ for (int i = 0; i < 10; i++) {
+ final int index = i;
+ threads[i] = new Thread(() -> {
+ results[index] = HttpDialectUtils.convertSql(targetURLs,
originSql, "presto", features, "{}");
+ });
+ threads[i].start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ // Verify all results
+ for (String result : results) {
+ Assert.assertEquals(expectedSql, result);
+ }
+ }
+
+ @Test
+ public void testZeroFailureGuarantee() throws InterruptedException {
+ String originSql = "select * from t1 where \"k1\" = 1";
+ String expectedSql = "select * from t1 where `k1` = 1";
+ String[] features = new String[] {"ctas"};
+
+ String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert";
+
+ int totalRequests = 30; // Reduced for faster testing with production
timeouts
+ int successCount = 0;
+
+ // Test various failure scenarios while ensuring at least one service
is always available
+ for (int i = 0; i < totalRequests; i++) {
+ if (i < 6) {
+ // All servers healthy
+ setAllServersHealthy(expectedSql);
+ } else if (i < 12) {
+ // Server 0 fails, others healthy
+ servers.get(0)
+ .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
+ servers.get(2).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
+ } else if (i < 18) {
+ // Servers 0,1 fail, server 2 healthy
+ servers.get(0)
+ .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
+ servers.get(1)
+ .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 503, \"message\": \"error\"}");
+ servers.get(2).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
+ } else if (i < 24) {
+ // Only server 1 healthy
+ servers.get(0)
+ .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
+ servers.get(2)
+ .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
+ } else {
+ // Alternating recovery
+ if (i % 2 == 0) {
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql
+ "\", \"code\": 0, \"message\": \"\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"\", \"code\":
500, \"message\": \"error\"}");
+ servers.get(2).setResponse(
+ "{\"version\": \"v1\", \"data\": \"\", \"code\":
500, \"message\": \"error\"}");
+ } else {
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"\", \"code\":
500, \"message\": \"error\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"\", \"code\":
500, \"message\": \"error\"}");
+ servers.get(2).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql
+ "\", \"code\": 0, \"message\": \"\"}");
+ }
+ }
+
+ String result = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
+ if (expectedSql.equals(result)) {
+ successCount++;
+ }
+
+ Thread.sleep(50); // Small delay between requests
+ }
+
+ System.out.println("Zero Failure Guarantee Test Results:");
+ System.out.println("Total requests: " + totalRequests);
+ System.out.println("Successful: " + successCount);
+ System.out.println("Success rate: " + (successCount * 100.0 /
totalRequests) + "%");
+
+ // Must achieve 100% success rate when at least one service is
available
+ Assert.assertEquals("Must achieve 100% success rate when service is
available",
+ totalRequests, successCount);
+ }
+
+ @Test
+ public void testNetworkJitterStress() throws InterruptedException {
+ String originSql = "select * from t1 where \"k1\" = 1";
+ String expectedSql = "select * from t1 where `k1` = 1";
+ String[] features = new String[] {"ctas"};
+
+ String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
+ + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
+
+ int totalRequests = 15; // Reduced for faster testing with production
timeouts
+ int successCount = 0;
+
+ // Simulate network jitter while ensuring at least one server is
always available
+ for (int i = 0; i < totalRequests; i++) {
+ double random = Math.random();
+ if (random < 0.3) {
+ // Server 0 fails, Server 1 works
+ servers.get(0)
+ .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"timeout\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
+ } else if (random < 0.5) {
+ // Server 1 fails, Server 0 works
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"\", \"code\": 503,
\"message\": \"service unavailable\"}");
+ } else {
+ // Both servers work
+ servers.get(0).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
+ servers.get(1).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
+ }
+
+ String result = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
+ if (expectedSql.equals(result)) {
+ successCount++;
+ }
+
+ Thread.sleep(100); // Delay between requests for production
timeouts
+ }
+
+ System.out.println("Network Jitter Test Results:");
+ System.out.println("Total requests: " + totalRequests);
+ System.out.println("Successful: " + successCount);
+ System.out.println("Success rate: " + (successCount * 100.0 /
totalRequests) + "%");
+
+ // Must achieve 100% success rate since we ensure at least one server
is always available
+ Assert.assertEquals("Must handle network jitter with 100% success when
service is available",
+ totalRequests, successCount);
+ }
+
+ private void setAllServersHealthy(String expectedSql) {
+ for (int i = 0; i < 3; i++) {
+ servers.get(i).setResponse(
+ "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+ }
+ }
+
private static int findValidPort() {
int port;
while (true) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]