This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new bcb9b08a726 branch-4.0: Revert "[Enhancement](sql-dialect) Support
multiple sql-converter service urls (#52636)" (#59611)
bcb9b08a726 is described below
commit bcb9b08a726acd200f8ca65383e30abdd3d46d22
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Jan 7 18:20:52 2026 +0800
branch-4.0: Revert "[Enhancement](sql-dialect) Support multiple
sql-converter service urls (#52636)" (#59611)
bp #59610
---
.../plugin/dialect/HttpDialectConverterPlugin.java | 9 +
.../doris/plugin/dialect/HttpDialectUtils.java | 268 +--------------
.../apache/doris/plugin/HttpDialectUtilsTest.java | 360 +--------------------
3 files changed, 38 insertions(+), 599 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java
index bff2dfd1340..02b19ae0ba1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java
@@ -103,6 +103,15 @@ public class HttpDialectConverterPlugin extends Plugin
implements DialectConvert
if (Strings.isNullOrEmpty(targetURL)) {
return null;
}
+ // TODO: support multiple URLs load balancing, here we just use the
first one
+ String[] urlArray = targetURL.split(",");
+ for (String url : urlArray) {
+ String trimmedUrl = url.trim();
+ if (!trimmedUrl.isEmpty()) {
+ targetURL = trimmedUrl;
+ break;
+ }
+ }
return HttpDialectUtils.convertSql(targetURL, originSql,
sessionVariable.getSqlDialect(),
sessionVariable.getSqlConvertorFeatures(),
sessionVariable.getSqlConvertorConfig());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
index 5131cf82bf6..89acd66658d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
@@ -17,9 +17,6 @@
package org.apache.doris.plugin.dialect;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.Data;
@@ -33,113 +30,35 @@ import java.lang.reflect.Type;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
/**
* This class is used to convert sql with different dialects using sql
convertor service.
* The sql convertor service is a http service which is used to convert sql.
- * <p>
- * Features:
- * - Support multiple URLs (comma separated)
- * - Blacklist mechanism for failed URLs
- * - Automatic failover and retry
- * - URL caching and smart selection
*/
public class HttpDialectUtils {
private static final Logger LOG =
LogManager.getLogger(HttpDialectUtils.class);
- // Cache URL manager instances to avoid duplicate parsing with automatic
expiration
- private static final Cache<String, UrlManager> urlManagerCache =
Caffeine.newBuilder()
- .maximumSize(10)
- .expireAfterAccess(8, TimeUnit.HOURS)
- .build();
-
- // Blacklist recovery time (ms): 1 minute
- private static final long BLACKLIST_RECOVERY_TIME_MS = 60 * 1000;
- // Connection timeout period (ms): 3 seconds
- private static final int CONNECTION_TIMEOUT_MS = 3000;
- // Read timeout period (ms): 10 seconds
- private static final int READ_TIMEOUT_MS = 10000;
-
- public static String convertSql(String targetURLs, String originStmt,
String dialect,
+ public static String convertSql(String targetURL, String originStmt,
String dialect,
String[] features, String config) {
- UrlManager urlManager = getOrCreateUrlManager(targetURLs);
ConvertRequest convertRequest = new ConvertRequest(originStmt,
dialect, features, config);
- String requestStr = convertRequest.toJson();
-
- // Try to convert SQL using intelligent URL selection strategy
- return tryConvertWithIntelligentSelection(urlManager, requestStr,
originStmt);
- }
-
- /**
- * Try to convert SQL using intelligent URL selection strategy
- * CRITICAL: This method ensures 100% success rate when ANY service is
available
- */
- private static String tryConvertWithIntelligentSelection(
- UrlManager urlManager, String requestStr, String originStmt) {
- // Strategy: Try ALL URLs in intelligent order, regardless of
blacklist status
- // This ensures 100% success rate when any service is actually
available
- List<String> allUrls = urlManager.getAllUrlsInPriorityOrder();
-
- for (String url : allUrls) {
- try {
- String result = doConvertSql(url, requestStr);
- // If no exception thrown, HTTP response was successful (200)
- // Mark URL as healthy and return result (even if empty)
- urlManager.markUrlAsHealthy(url);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully converted SQL using URL: {}", url);
- }
- return result;
- } catch (Exception e) {
- LOG.warn("Failed to convert SQL using URL: {}, error: {}",
url, e.getMessage());
- // Add failed URL to blacklist for future optimization
- urlManager.markUrlAsBlacklisted(url);
- // Continue trying next URL - this is CRITICAL for 100%
success rate
- }
- }
-
- return originStmt;
- }
-
- /**
- * Get or create a URL manager
- */
- private static UrlManager getOrCreateUrlManager(String targetURLs) {
- return urlManagerCache.get(targetURLs, UrlManager::new);
- }
-
- /**
- * Perform SQL conversion for individual URL
- */
- private static String doConvertSql(String targetURL, String requestStr)
throws Exception {
HttpURLConnection connection = null;
try {
- if (targetURL == null || targetURL.trim().isEmpty()) {
- throw new Exception("Target URL is null or empty");
- }
- URL url = new URL(targetURL.trim());
+ URL url = new URL(targetURL);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setUseCaches(false);
connection.setDoOutput(true);
- connection.setConnectTimeout(CONNECTION_TIMEOUT_MS);
- connection.setReadTimeout(READ_TIMEOUT_MS);
+ String requestStr = convertRequest.toJson();
try (OutputStream outputStream = connection.getOutputStream()) {
outputStream.write(requestStr.getBytes(StandardCharsets.UTF_8));
}
int responseCode = connection.getResponseCode();
if (LOG.isDebugEnabled()) {
- LOG.debug("POST Response Code: {}, URL: {}, post data: {}",
responseCode, targetURL, requestStr);
+ LOG.debug("POST Response Code: {}, post data: {}",
responseCode, requestStr);
}
if (responseCode == HttpURLConnection.HTTP_OK) {
@@ -157,20 +76,26 @@ public class HttpDialectUtils {
}.getType();
ConvertResponse result = new
Gson().fromJson(response.toString(), type);
if (LOG.isDebugEnabled()) {
- LOG.debug("Convert response: {}, URL: {}", result,
targetURL);
+ LOG.debug("convert response: {}", result);
}
if (result.code == 0) {
if (!"v1".equals(result.version)) {
- throw new Exception("Unsupported version: " +
result.version);
+ LOG.warn("failed to convert sql, response version
is not v1: {}", result.version);
+ return originStmt;
}
return result.data;
} else {
- throw new Exception("Conversion failed: " +
result.message);
+ LOG.warn("failed to convert sql, response: {}",
result);
+ return originStmt;
}
}
} else {
- throw new Exception("HTTP response code: " + responseCode);
+ LOG.warn("failed to convert sql, response code: {}",
responseCode);
+ return originStmt;
}
+ } catch (Exception e) {
+ LOG.warn("failed to convert sql", e);
+ return originStmt;
} finally {
if (connection != null) {
connection.disconnect();
@@ -178,171 +103,6 @@ public class HttpDialectUtils {
}
}
- /**
- * URL Manager - Responsible for URL parsing, caching, blacklist
management, and smart selection
- */
- private static class UrlManager {
- private final List<String> parsedUrls;
- private final ConcurrentHashMap<String, BlacklistEntry> blacklist;
-
- public UrlManager(String urls) {
- this.parsedUrls = parseUrls(urls);
- this.blacklist = new ConcurrentHashMap<>();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created UrlManager with URLs: {}, parsed: {}",
urls, parsedUrls);
- }
- }
-
- /**
- * Parse comma separated URL strings
- */
- private List<String> parseUrls(String urls) {
- List<String> result = Lists.newArrayList();
- if (urls != null && !urls.trim().isEmpty()) {
- String[] urlArray = urls.split(",");
- for (String url : urlArray) {
- String trimmedUrl = url.trim();
- if (!trimmedUrl.isEmpty()) {
- result.add(trimmedUrl);
- }
- }
- }
- return result;
- }
-
- /**
- * Mark URL as healthy (remove from blacklist)
- */
- public void markUrlAsHealthy(String url) {
- if (blacklist.remove(url) != null) {
- LOG.info("Removed URL from blacklist due to successful
request: {}", url);
- }
- }
-
- /**
- * Add URL to blacklist
- */
- public void markUrlAsBlacklisted(String url) {
- // If URL is already in blacklist, just return
- if (blacklist.containsKey(url)) {
- return;
- }
-
- long currentTime = System.currentTimeMillis();
- long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS;
- blacklist.put(url, new BlacklistEntry(currentTime, recoverTime));
- LOG.warn("Added URL to blacklist: {}, will recover at: {}", url,
new Date(recoverTime));
- }
-
- /**
- * Check if URL is localhost (127.0.0.1 or localhost)
- */
- private boolean isLocalhost(String url) {
- return url.contains("127.0.0.1") || url.contains("localhost");
- }
-
- /**
- * Get ALL URLs in priority order for 100% success guarantee
- * CRITICAL: This method ensures we try every URL when any service
might be available
- * <p>
- * Priority order:
- * 1. Localhost URLs (127.0.0.1 or localhost) that are healthy
- * 2. Other healthy URLs (randomly selected)
- * 3. Localhost URLs in blacklist
- * 4. Other blacklisted URLs (sorted by recovery time)
- */
- public List<String> getAllUrlsInPriorityOrder() {
- List<String> prioritizedUrls = Lists.newArrayList();
- List<String> healthyLocalhost = Lists.newArrayList();
- List<String> healthyOthers = Lists.newArrayList();
- List<String> blacklistedLocalhost = Lists.newArrayList();
- List<String> blacklistedOthers = Lists.newArrayList();
-
- long currentTime = System.currentTimeMillis();
-
- // Single traversal to categorize all URLs
- for (String url : parsedUrls) {
- BlacklistEntry entry = blacklist.get(url);
- boolean isHealthy = false;
-
- if (entry == null) {
- // URL is not in blacklist, consider it healthy
- isHealthy = true;
- } else if (currentTime >= entry.recoverTime) {
- // URL has reached recovery time, remove from blacklist
and consider healthy
- blacklist.remove(url);
- isHealthy = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("URL recovered from blacklist: {}", url);
- }
- }
-
- boolean isLocal = isLocalhost(url);
-
- if (isHealthy) {
- if (isLocal) {
- healthyLocalhost.add(url);
- } else {
- healthyOthers.add(url);
- }
- } else {
- if (isLocal) {
- blacklistedLocalhost.add(url);
- } else {
- blacklistedOthers.add(url);
- }
- }
- }
-
- // Add URLs in priority order
- // 1. Healthy localhost URLs first
- prioritizedUrls.addAll(healthyLocalhost);
-
- // 2. Other healthy URLs (randomly shuffled for load balancing)
- Collections.shuffle(healthyOthers, ThreadLocalRandom.current());
- prioritizedUrls.addAll(healthyOthers);
-
- // 3. Blacklisted localhost URLs
- prioritizedUrls.addAll(blacklistedLocalhost);
-
- // 4. Other blacklisted URLs (sorted by recovery time)
- blacklistedOthers.sort((url1, url2) -> {
- BlacklistEntry entry1 = blacklist.get(url1);
- BlacklistEntry entry2 = blacklist.get(url2);
- if (entry1 == null && entry2 == null) {
- return 0;
- }
- if (entry1 == null) {
- return -1;
- }
- if (entry2 == null) {
- return 1;
- }
- return Long.compare(entry1.recoverTime, entry2.recoverTime);
- });
- prioritizedUrls.addAll(blacklistedOthers);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("All URLs in priority order: {}", prioritizedUrls);
- }
-
- return prioritizedUrls;
- }
- }
-
- /**
- * Blacklist entry
- */
- private static class BlacklistEntry {
- final long blacklistedTime;
- final long recoverTime;
-
- BlacklistEntry(long blacklistedTime, long recoverTime) {
- this.blacklistedTime = blacklistedTime;
- this.recoverTime = recoverTime;
- }
- }
-
@Data
private static class ConvertRequest {
private String version; // CHECKSTYLE IGNORE THIS LINE
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
index 4ce71e196a3..de359f79475 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
@@ -28,379 +28,49 @@ import java.io.IOException;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.List;
public class HttpDialectUtilsTest {
- private List<Integer> ports = new ArrayList<>();
- private List<SimpleHttpServer> servers = new ArrayList<>();
+ private int port;
+ private SimpleHttpServer server;
@Before
public void setUp() throws Exception {
- // Create three test servers
- for (int i = 0; i < 3; i++) {
- int port = findValidPort();
- ports.add(port);
- SimpleHttpServer server = new SimpleHttpServer(port);
- server.start("/api/v1/convert");
- servers.add(server);
- }
+ port = findValidPort();
+ server = new SimpleHttpServer(port);
+ server.start("/api/v1/convert");
}
@After
public void tearDown() {
- for (SimpleHttpServer server : servers) {
- if (server != null) {
- server.stop();
- }
+ if (server != null) {
+ server.stop();
}
- servers.clear();
- ports.clear();
}
@Test
- public void testSingleUrlConvert() {
+ public void testSqlConvert() {
String originSql = "select * from t1 where \"k1\" = 1";
String expectedSql = "select * from t1 where `k1` = 1";
String[] features = new String[] {"ctas"};
- String targetURL = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert";
-
- // Test with no response (should return original SQL)
+ String targetURL = "http://127.0.0.1:" + port + "/api/v1/convert";
String res = HttpDialectUtils.convertSql(targetURL, originSql,
"presto", features, "{}");
Assert.assertEquals(originSql, res);
-
- // Test successful conversion
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+ // test presto
+ server.setResponse("{\"version\": \"v1\", \"data\": \"" + expectedSql
+ "\", \"code\": 0, \"message\": \"\"}");
res = HttpDialectUtils.convertSql(targetURL, originSql, "presto",
features, "{}");
Assert.assertEquals(expectedSql, res);
-
- // Test version error
- servers.get(0).setResponse(
- "{\"version\": \"v2\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
+ // test response version error
+ server.setResponse("{\"version\": \"v2\", \"data\": \"" + expectedSql
+ "\", \"code\": 0, \"message\": \"\"}");
res = HttpDialectUtils.convertSql(targetURL, originSql, "presto",
features, "{}");
Assert.assertEquals(originSql, res);
-
- // Test code error
- servers.get(0).setResponse(
+ // test response code error
+ server.setResponse(
"{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 400, \"message\": \"\"}");
res = HttpDialectUtils.convertSql(targetURL, originSql, "presto",
features, "{}");
Assert.assertEquals(originSql, res);
}
- @Test
- public void testMultipleUrlsConvert() {
- String originSql = "select * from t1 where \"k1\" = 1";
- String expectedSql = "select * from t1 where `k1` = 1";
- String[] features = new String[] {"ctas"};
-
- String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert";
-
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
-
- String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
- Assert.assertEquals(expectedSql, res);
- }
-
- @Test
- public void testFailoverMechanism() throws InterruptedException {
- String originSql = "select * from t1 where \"k1\" = 1";
- String expectedSql = "select * from t1 where `k1` = 1";
- String[] features = new String[] {"ctas"};
-
- String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
-
- // First server returns error, second server succeeds
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 500, \"message\": \"error\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
-
- String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
- Assert.assertEquals(expectedSql, res);
- }
-
- @Test
- public void testBlacklistMechanism() throws InterruptedException {
- String originSql = "select * from t1 where \"k1\" = 1";
- String expectedSql = "select * from t1 where `k1` = 1";
- String[] features = new String[] {"ctas"};
-
- String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
-
- // Stop first server, set second server to work
- servers.get(0).stop();
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
-
- // First call should succeed via second server
- String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
- Assert.assertEquals(expectedSql, res);
-
- // Restart first server
- servers.set(0, new SimpleHttpServer(ports.get(0)));
- try {
- servers.get(0).start("/api/v1/convert");
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
- } catch (IOException e) {
- return; // Skip test if port is occupied
- }
-
- // Should still work with blacklist recovery
- res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto",
features, "{}");
- Assert.assertEquals(expectedSql, res);
- }
-
- @Test
- public void testAllUrlsFailure() {
- String originSql = "select * from t1 where \"k1\" = 1";
- String[] features = new String[] {"ctas"};
-
- String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
-
- // All servers return error
- servers.get(0).setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
- servers.get(1).setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
-
- String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
- Assert.assertEquals(originSql, res);
- }
-
- @Test
- public void testUrlParsing() {
- String originSql = "select * from t1 where \"k1\" = 1";
- String expectedSql = "select * from t1 where `k1` = 1";
- String[] features = new String[] {"ctas"};
-
- // Test URL parsing with spaces and empty items
- String targetURLs = " http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert , ,"
- + " http://127.0.0.1:" + ports.get(1) + "/api/v1/convert ";
-
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
-
- String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
- Assert.assertEquals(expectedSql, res);
- }
-
- @Test
- public void testSeamlessFailover() throws IOException {
- String originSql = "select * from t1 where \"k1\" = 1";
- String expectedSql = "select * from t1 where `k1` = 1";
- String[] features = new String[] {"ctas"};
-
- String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
-
- // Both servers start healthy
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
-
- String res = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
- Assert.assertEquals(expectedSql, res);
-
- // Stop first server
- servers.get(0).stop();
- res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto",
features, "{}");
- Assert.assertEquals(expectedSql, res);
-
- // Restart first server, stop second
- servers.set(0, new SimpleHttpServer(ports.get(0)));
- servers.get(0).start("/api/v1/convert");
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
- servers.get(1).stop();
-
- // Should seamlessly switch to first server
- res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto",
features, "{}");
- Assert.assertEquals(expectedSql, res);
- }
-
- @Test
- public void testConcurrentRequests() throws InterruptedException {
- String originSql = "select * from t1 where \"k1\" = 1";
- String expectedSql = "select * from t1 where `k1` = 1";
- String[] features = new String[] {"ctas"};
-
- String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
-
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
-
- // Test with multiple concurrent threads
- Thread[] threads = new Thread[10];
- String[] results = new String[10];
-
- for (int i = 0; i < 10; i++) {
- final int index = i;
- threads[i] = new Thread(() -> {
- results[index] = HttpDialectUtils.convertSql(targetURLs,
originSql, "presto", features, "{}");
- });
- threads[i].start();
- }
-
- for (Thread thread : threads) {
- thread.join();
- }
-
- // Verify all results
- for (String result : results) {
- Assert.assertEquals(expectedSql, result);
- }
- }
-
- @Test
- public void testZeroFailureGuarantee() throws InterruptedException {
- String originSql = "select * from t1 where \"k1\" = 1";
- String expectedSql = "select * from t1 where `k1` = 1";
- String[] features = new String[] {"ctas"};
-
- String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert";
-
- int totalRequests = 30; // Reduced for faster testing with production
timeouts
- int successCount = 0;
-
- // Test various failure scenarios while ensuring at least one service
is always available
- for (int i = 0; i < totalRequests; i++) {
- if (i < 6) {
- // All servers healthy
- setAllServersHealthy(expectedSql);
- } else if (i < 12) {
- // Server 0 fails, others healthy
- servers.get(0)
- .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
- servers.get(2).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
- } else if (i < 18) {
- // Servers 0,1 fail, server 2 healthy
- servers.get(0)
- .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
- servers.get(1)
- .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 503, \"message\": \"error\"}");
- servers.get(2).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
- } else if (i < 24) {
- // Only server 1 healthy
- servers.get(0)
- .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
- servers.get(2)
- .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"error\"}");
- } else {
- // Alternating recovery
- if (i % 2 == 0) {
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql
+ "\", \"code\": 0, \"message\": \"\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"\", \"code\":
500, \"message\": \"error\"}");
- servers.get(2).setResponse(
- "{\"version\": \"v1\", \"data\": \"\", \"code\":
500, \"message\": \"error\"}");
- } else {
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"\", \"code\":
500, \"message\": \"error\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"\", \"code\":
500, \"message\": \"error\"}");
- servers.get(2).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql
+ "\", \"code\": 0, \"message\": \"\"}");
- }
- }
-
- String result = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
- if (expectedSql.equals(result)) {
- successCount++;
- }
-
- Thread.sleep(50); // Small delay between requests
- }
-
- System.out.println("Zero Failure Guarantee Test Results:");
- System.out.println("Total requests: " + totalRequests);
- System.out.println("Successful: " + successCount);
- System.out.println("Success rate: " + (successCount * 100.0 /
totalRequests) + "%");
-
- // Must achieve 100% success rate when at least one service is
available
- Assert.assertEquals("Must achieve 100% success rate when service is
available",
- totalRequests, successCount);
- }
-
- @Test
- public void testNetworkJitterStress() throws InterruptedException {
- String originSql = "select * from t1 where \"k1\" = 1";
- String expectedSql = "select * from t1 where `k1` = 1";
- String[] features = new String[] {"ctas"};
-
- String targetURLs = "http://127.0.0.1:" + ports.get(0) +
"/api/v1/convert,"
- + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert";
-
- int totalRequests = 15; // Reduced for faster testing with production
timeouts
- int successCount = 0;
-
- // Simulate network jitter while ensuring at least one server is
always available
- for (int i = 0; i < totalRequests; i++) {
- double random = Math.random();
- if (random < 0.3) {
- // Server 0 fails, Server 1 works
- servers.get(0)
- .setResponse("{\"version\": \"v1\", \"data\": \"\",
\"code\": 500, \"message\": \"timeout\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
- } else if (random < 0.5) {
- // Server 1 fails, Server 0 works
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"\", \"code\": 503,
\"message\": \"service unavailable\"}");
- } else {
- // Both servers work
- servers.get(0).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
- servers.get(1).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql +
"\", \"code\": 0, \"message\": \"\"}");
- }
-
- String result = HttpDialectUtils.convertSql(targetURLs, originSql,
"presto", features, "{}");
- if (expectedSql.equals(result)) {
- successCount++;
- }
-
- Thread.sleep(100); // Delay between requests for production
timeouts
- }
-
- System.out.println("Network Jitter Test Results:");
- System.out.println("Total requests: " + totalRequests);
- System.out.println("Successful: " + successCount);
- System.out.println("Success rate: " + (successCount * 100.0 /
totalRequests) + "%");
-
- // Must achieve 100% success rate since we ensure at least one server
is always available
- Assert.assertEquals("Must handle network jitter with 100% success when
service is available",
- totalRequests, successCount);
- }
-
- private void setAllServersHealthy(String expectedSql) {
- for (int i = 0; i < 3; i++) {
- servers.get(i).setResponse(
- "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\",
\"code\": 0, \"message\": \"\"}");
- }
- }
-
private static int findValidPort() {
int port;
while (true) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]