This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 4601838bb9 Make flight sql client generic (#8915)
4601838bb9 is described below

commit 4601838bb9b8fe4256499301fcb094e2997eb907
Author: 张林伟 <[email protected]>
AuthorDate: Fri Dec 12 03:32:39 2025 +0800

    Make flight sql client generic (#8915)
    
    # Which issue does this PR close?
    
    None.
    
    # Rationale for this change
    
    We may not directly use `Channel` (maybe a wrapper of `Channel`), make
    flight sql client more generic to receive any type which implements
    GrpcService trait.
    
    # What changes are included in this PR?
    
    Change `Channel` to generic type.
    
    # Are these changes tested?
    
    CI.
    
    # Are there any user-facing changes?
    
    No.
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-flight/src/sql/client.rs | 53 +++++++++++++++++++++++++++++++-----------
 1 file changed, 39 insertions(+), 14 deletions(-)

diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs
index 5009ae5ea5..4fb27ab5fc 100644
--- a/arrow-flight/src/sql/client.rs
+++ b/arrow-flight/src/sql/client.rs
@@ -56,12 +56,12 @@ use arrow_ipc::{MessageHeader, root_as_message};
 use arrow_schema::{ArrowError, Schema, SchemaRef};
 use futures::{Stream, TryStreamExt, stream};
 use prost::Message;
-use tonic::transport::Channel;
+use tonic::codegen::{Body, StdError};
 use tonic::{IntoRequest, IntoStreamingRequest, Streaming};
 
 /// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow 
data
 /// by FlightSQL protocol.
-#[derive(Debug, Clone)]
+#[derive(Debug)]
 pub struct FlightSqlServiceClient<T> {
     token: Option<String>,
     headers: HashMap<String, String>,
@@ -71,14 +71,20 @@ pub struct FlightSqlServiceClient<T> {
 /// A FlightSql protocol client that can run queries against FlightSql servers
 /// This client is in the "experimental" stage. It is not guaranteed to follow 
the spec in all instances.
 /// Github issues are welcomed.
-impl FlightSqlServiceClient<Channel> {
+impl<T> FlightSqlServiceClient<T>
+where
+    T: tonic::client::GrpcService<tonic::body::Body>,
+    T::Error: Into<StdError>,
+    T::ResponseBody: Body<Data = Bytes> + Send + 'static,
+    <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+{
     /// Creates a new FlightSql client that connects to a server over an 
arbitrary tonic `Channel`
-    pub fn new(channel: Channel) -> Self {
+    pub fn new(channel: T) -> Self {
         Self::new_from_inner(FlightServiceClient::new(channel))
     }
 
     /// Creates a new higher level client with the provided lower level client
-    pub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self {
+    pub fn new_from_inner(inner: FlightServiceClient<T>) -> Self {
         Self {
             token: None,
             flight_client: inner,
@@ -87,17 +93,17 @@ impl FlightSqlServiceClient<Channel> {
     }
 
     /// Return a reference to the underlying [`FlightServiceClient`]
-    pub fn inner(&self) -> &FlightServiceClient<Channel> {
+    pub fn inner(&self) -> &FlightServiceClient<T> {
         &self.flight_client
     }
 
     /// Return a mutable reference to the underlying [`FlightServiceClient`]
-    pub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel> {
+    pub fn inner_mut(&mut self) -> &mut FlightServiceClient<T> {
         &mut self.flight_client
     }
 
     /// Consume this client and return the underlying [`FlightServiceClient`]
-    pub fn into_inner(self) -> FlightServiceClient<Channel> {
+    pub fn into_inner(self) -> FlightServiceClient<T> {
         self.flight_client
     }
 
@@ -416,7 +422,10 @@ impl FlightSqlServiceClient<Channel> {
         &mut self,
         query: String,
         transaction_id: Option<Bytes>,
-    ) -> Result<PreparedStatement<Channel>, ArrowError> {
+    ) -> Result<PreparedStatement<T>, ArrowError>
+    where
+        T: Clone,
+    {
         let cmd = ActionCreatePreparedStatementRequest {
             query,
             transaction_id,
@@ -509,10 +518,10 @@ impl FlightSqlServiceClient<Channel> {
         Ok(())
     }
 
-    fn set_request_headers<T>(
+    fn set_request_headers<M>(
         &self,
-        mut req: tonic::Request<T>,
-    ) -> Result<tonic::Request<T>, ArrowError> {
+        mut req: tonic::Request<M>,
+    ) -> Result<tonic::Request<M>, ArrowError> {
         for (k, v) in &self.headers {
             let k = AsciiMetadataKey::from_str(k.as_str()).map_err(|e| {
                 ArrowError::ParseError(format!("Cannot convert header key 
\"{k}\": {e}"))
@@ -532,6 +541,16 @@ impl FlightSqlServiceClient<Channel> {
     }
 }
 
+impl<T: Clone> Clone for FlightSqlServiceClient<T> {
+    fn clone(&self) -> Self {
+        Self {
+            headers: self.headers.clone(),
+            token: self.token.clone(),
+            flight_client: self.flight_client.clone(),
+        }
+    }
+}
+
 /// A PreparedStatement
 #[derive(Debug, Clone)]
 pub struct PreparedStatement<T> {
@@ -542,9 +561,15 @@ pub struct PreparedStatement<T> {
     parameter_schema: Schema,
 }
 
-impl PreparedStatement<Channel> {
+impl<T> PreparedStatement<T>
+where
+    T: tonic::client::GrpcService<tonic::body::Body>,
+    T::Error: Into<StdError>,
+    T::ResponseBody: Body<Data = Bytes> + Send + 'static,
+    <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+{
     pub(crate) fn new(
-        flight_client: FlightSqlServiceClient<Channel>,
+        flight_client: FlightSqlServiceClient<T>,
         handle: impl Into<Bytes>,
         dataset_schema: Schema,
         parameter_schema: Schema,

Reply via email to