This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ae6d35ff1 [hotfix] broker selection not using table name (#9902)
4ae6d35ff1 is described below

commit 4ae6d35ff1593688c799be3c2dace8bf9fc9eab8
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Dec 2 20:58:25 2022 -0800

    [hotfix] broker selection not using table name (#9902)
    
    * fix broker selection not using table name
    * Apply suggestions from code review
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
    Co-authored-by: Xiaotian (Jackie) Jiang 
<17555551+jackie-ji...@users.noreply.github.com>
---
 .../java/org/apache/pinot/client/Connection.java   | 37 ++++++++++++++++++----
 .../apache/pinot/client/PreparedStatementTest.java | 30 ++++++++++++++++++
 2 files changed, 61 insertions(+), 6 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
index 26df70c4c6..c236446d3d 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,6 +116,7 @@ public class Connection {
    */
   public ResultSetGroup execute(@Nullable String tableName, String query)
       throws PinotClientException {
+    tableName = tableName == null ? resolveTableName(query) : tableName;
     String brokerHostPort = _brokerSelector.selectBroker(tableName);
     if (brokerHostPort == null) {
       throw new PinotClientException("Could not find broker to query for 
table: " + tableName);
@@ -148,11 +150,7 @@ public class Connection {
    */
   public Future<ResultSetGroup> executeAsync(String query)
       throws PinotClientException {
-    String brokerHostPort = _brokerSelector.selectBroker(null);
-    if (brokerHostPort == null) {
-      throw new PinotClientException("Could not find broker to query for 
statement: " + query);
-    }
-    return new 
ResultSetGroupFuture(_transport.executeQueryAsync(brokerHostPort, query));
+    return executeAsync(null, query);
   }
 
   /**
@@ -165,7 +163,34 @@ public class Connection {
   @Deprecated
   public Future<ResultSetGroup> executeAsync(Request request)
       throws PinotClientException {
-    return executeAsync(request.getQuery());
+    return executeAsync(null, request.getQuery());
+  }
+
+  /**
+   * Executes a query asynchronously.
+   *
+   * @param query The query to execute
+   * @return A future containing the result of the query
+   * @throws PinotClientException If an exception occurs while processing the 
query
+   */
+  public Future<ResultSetGroup> executeAsync(@Nullable String tableName, 
String query)
+      throws PinotClientException {
+    tableName = tableName == null ? resolveTableName(query) : tableName;
+    String brokerHostPort = _brokerSelector.selectBroker(tableName);
+    if (brokerHostPort == null) {
+      throw new PinotClientException("Could not find broker to query for 
statement: " + query);
+    }
+    return new 
ResultSetGroupFuture(_transport.executeQueryAsync(brokerHostPort, query));
+  }
+
+  @Nullable
+  private static String resolveTableName(String query) {
+    try {
+      return 
CalciteSqlCompiler.compileToBrokerRequest(query).querySource.tableName;
+    } catch (Exception e) {
+      LOGGER.error("Cannot parse table name from query: {}", query, e);
+      return null;
+    }
   }
 
   /**
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
index 230fbf99be..404b1b279a 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.client;
 
 import java.util.Collections;
 import java.util.concurrent.Future;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -31,6 +32,26 @@ import org.testng.annotations.Test;
 public class PreparedStatementTest {
   private final DummyPinotClientTransport _dummyPinotClientTransport = new 
DummyPinotClientTransport();
 
+  @Test
+  public void testPreparedStatementWithDynamicBroker() {
+    // Create a connection with dynamic broker selector.
+    BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class);
+    
Mockito.when(mockBrokerSelector.selectBroker(Mockito.anyString())).thenAnswer(i 
-> i.getArgument(0));
+    Connection connection = new Connection(mockBrokerSelector, 
_dummyPinotClientTransport);
+
+    PreparedStatement preparedStatement = connection.prepareStatement("SELECT 
foo FROM bar WHERE baz = ?");
+    preparedStatement.setString(0, "'hello'");
+    preparedStatement.execute();
+    Assert.assertEquals("SELECT foo FROM bar WHERE baz = '''hello'''", 
_dummyPinotClientTransport.getLastQuery());
+    Assert.assertEquals("bar", 
_dummyPinotClientTransport.getLastBrokerAddress());
+
+    preparedStatement = connection.prepareStatement("SELECT bar FROM foo WHERE 
baz = ?");
+    preparedStatement.setString(0, "'world'");
+    preparedStatement.executeAsync();
+    Assert.assertEquals("SELECT bar FROM foo WHERE baz = '''world'''", 
_dummyPinotClientTransport.getLastQuery());
+    Assert.assertEquals("foo", 
_dummyPinotClientTransport.getLastBrokerAddress());
+  }
+
   @Test
   public void testPreparedStatementEscaping() {
     // Create a prepared statement that has to quote a string appropriately
@@ -45,11 +66,13 @@ public class PreparedStatementTest {
   }
 
   static class DummyPinotClientTransport implements PinotClientTransport {
+    private String _lastBrokerAddress;
     private String _lastQuery;
 
     @Override
     public BrokerResponse executeQuery(String brokerAddress, String query)
         throws PinotClientException {
+      _lastBrokerAddress = brokerAddress;
       _lastQuery = query;
       return BrokerResponse.empty();
     }
@@ -57,6 +80,7 @@ public class PreparedStatementTest {
     @Override
     public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
String query)
         throws PinotClientException {
+      _lastBrokerAddress = brokerAddress;
       _lastQuery = query;
       return null;
     }
@@ -64,6 +88,7 @@ public class PreparedStatementTest {
     @Override
     public BrokerResponse executeQuery(String brokerAddress, Request request)
         throws PinotClientException {
+      _lastBrokerAddress = brokerAddress;
       _lastQuery = request.getQuery();
       return BrokerResponse.empty();
     }
@@ -71,6 +96,7 @@ public class PreparedStatementTest {
     @Override
     public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
Request request)
         throws PinotClientException {
+      _lastBrokerAddress = brokerAddress;
       _lastQuery = request.getQuery();
       return null;
     }
@@ -79,6 +105,10 @@ public class PreparedStatementTest {
       return _lastQuery;
     }
 
+    public String getLastBrokerAddress() {
+      return _lastBrokerAddress;
+    }
+
     @Override
     public void close()
         throws PinotClientException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to