This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4ae6d35ff1 [hotfix] broker selection not using table name (#9902) 4ae6d35ff1 is described below commit 4ae6d35ff1593688c799be3c2dace8bf9fc9eab8 Author: Rong Rong <ro...@apache.org> AuthorDate: Fri Dec 2 20:58:25 2022 -0800 [hotfix] broker selection not using table name (#9902) * fix broker selection not using table name * Apply suggestions from code review Co-authored-by: Rong Rong <ro...@startree.ai> Co-authored-by: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> --- .../java/org/apache/pinot/client/Connection.java | 37 ++++++++++++++++++---- .../apache/pinot/client/PreparedStatementTest.java | 30 ++++++++++++++++++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java index 26df70c4c6..c236446d3d 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java @@ -25,6 +25,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; +import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,6 +116,7 @@ public class Connection { */ public ResultSetGroup execute(@Nullable String tableName, String query) throws PinotClientException { + tableName = tableName == null ? resolveTableName(query) : tableName; String brokerHostPort = _brokerSelector.selectBroker(tableName); if (brokerHostPort == null) { throw new PinotClientException("Could not find broker to query for table: " + tableName); @@ -148,11 +150,7 @@ public class Connection { */ public Future<ResultSetGroup> executeAsync(String query) throws PinotClientException { - String brokerHostPort = _brokerSelector.selectBroker(null); - if (brokerHostPort == null) { - throw new PinotClientException("Could not find broker to query for statement: " + query); - } - return new ResultSetGroupFuture(_transport.executeQueryAsync(brokerHostPort, query)); + return executeAsync(null, query); } /** @@ -165,7 +163,34 @@ public class Connection { @Deprecated public Future<ResultSetGroup> executeAsync(Request request) throws PinotClientException { - return executeAsync(request.getQuery()); + return executeAsync(null, request.getQuery()); + } + + /** + * Executes a query asynchronously. + * + * @param query The query to execute + * @return A future containing the result of the query + * @throws PinotClientException If an exception occurs while processing the query + */ + public Future<ResultSetGroup> executeAsync(@Nullable String tableName, String query) + throws PinotClientException { + tableName = tableName == null ? resolveTableName(query) : tableName; + String brokerHostPort = _brokerSelector.selectBroker(tableName); + if (brokerHostPort == null) { + throw new PinotClientException("Could not find broker to query for statement: " + query); + } + return new ResultSetGroupFuture(_transport.executeQueryAsync(brokerHostPort, query)); + } + + @Nullable + private static String resolveTableName(String query) { + try { + return CalciteSqlCompiler.compileToBrokerRequest(query).querySource.tableName; + } catch (Exception e) { + LOGGER.error("Cannot parse table name from query: {}", query, e); + return null; + } } /** diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java index 230fbf99be..404b1b279a 100644 --- a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java +++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java @@ -20,6 +20,7 @@ package org.apache.pinot.client; import java.util.Collections; import java.util.concurrent.Future; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -31,6 +32,26 @@ import org.testng.annotations.Test; public class PreparedStatementTest { private final DummyPinotClientTransport _dummyPinotClientTransport = new DummyPinotClientTransport(); + @Test + public void testPreparedStatementWithDynamicBroker() { + // Create a connection with dynamic broker selector. + BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class); + Mockito.when(mockBrokerSelector.selectBroker(Mockito.anyString())).thenAnswer(i -> i.getArgument(0)); + Connection connection = new Connection(mockBrokerSelector, _dummyPinotClientTransport); + + PreparedStatement preparedStatement = connection.prepareStatement("SELECT foo FROM bar WHERE baz = ?"); + preparedStatement.setString(0, "'hello'"); + preparedStatement.execute(); + Assert.assertEquals("SELECT foo FROM bar WHERE baz = '''hello'''", _dummyPinotClientTransport.getLastQuery()); + Assert.assertEquals("bar", _dummyPinotClientTransport.getLastBrokerAddress()); + + preparedStatement = connection.prepareStatement("SELECT bar FROM foo WHERE baz = ?"); + preparedStatement.setString(0, "'world'"); + preparedStatement.executeAsync(); + Assert.assertEquals("SELECT bar FROM foo WHERE baz = '''world'''", _dummyPinotClientTransport.getLastQuery()); + Assert.assertEquals("foo", _dummyPinotClientTransport.getLastBrokerAddress()); + } + @Test public void testPreparedStatementEscaping() { // Create a prepared statement that has to quote a string appropriately @@ -45,11 +66,13 @@ public class PreparedStatementTest { } static class DummyPinotClientTransport implements PinotClientTransport { + private String _lastBrokerAddress; private String _lastQuery; @Override public BrokerResponse executeQuery(String brokerAddress, String query) throws PinotClientException { + _lastBrokerAddress = brokerAddress; _lastQuery = query; return BrokerResponse.empty(); } @@ -57,6 +80,7 @@ public class PreparedStatementTest { @Override public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String query) throws PinotClientException { + _lastBrokerAddress = brokerAddress; _lastQuery = query; return null; } @@ -64,6 +88,7 @@ public class PreparedStatementTest { @Override public BrokerResponse executeQuery(String brokerAddress, Request request) throws PinotClientException { + _lastBrokerAddress = brokerAddress; _lastQuery = request.getQuery(); return BrokerResponse.empty(); } @@ -71,6 +96,7 @@ public class PreparedStatementTest { @Override public Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request request) throws PinotClientException { + _lastBrokerAddress = brokerAddress; _lastQuery = request.getQuery(); return null; } @@ -79,6 +105,10 @@ public class PreparedStatementTest { return _lastQuery; } + public String getLastBrokerAddress() { + return _lastBrokerAddress; + } + @Override public void close() throws PinotClientException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org