Susmit07 opened a new issue, #753:
URL: https://github.com/apache/arrow-java/issues/753

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   Hi Team,
   
   We are using this below code snippet in Scala to query the flight server, 
but seems to be stuck indefinitely, this issue is seen when we are testing from 
our local workstation (Mac to be precise)
   
   Another interesting thing, it's able to propagate the error message 
correctly but not the FlightStream data, the same code works fine when we run 
inside a linux VM.
   
   Do you folks see any issue in the code?
   
   ```
   def fetchDataStreamIterator(details: BaseDataAccessDetails): 
Iterator[FlightStream] = {
     logger.info(s"Fetching data for details: ${details.toString}")
     val ticketStr = buildTicketStr(details)
     logger.info(s"Generated ticket string: $ticketStr")
   
     val allocator = new RootAllocator(Long.MaxValue)
     val client = FlightClient.builder(allocator, 
Location.forGrpcInsecure(serverHost, serverPort)).build()
   
     try {
       val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
       val stream = client.getStream(ticket)
   
       Iterator.continually {
         if (stream.next()) Some(stream) else {
           // Cleanup when no more batches
           close(stream, client)
           None
         }
       }.takeWhile(_.isDefined).flatten
     } catch {
       case e: FlightRuntimeException =>
         logger.error(s"Error communicating with Flight server: 
${e.getMessage}")
         throw new CefFlightServerException(s"Error communicating with Flight 
server: ${e.getMessage}", e)
       case e: Exception =>
         logger.error(s"Failed to fetch data: ${e.getMessage}")
         throw new CefFlightServerException("Failed to fetch data from Flight 
Server", e)
     }
   }
   ```
   
   Attaching the client thread dump for reference
   
   
[threads_report.txt](https://github.com/user-attachments/files/20182747/threads_report.txt)
   
   Strangely if we pass a dummy non existent s3 path we are getting proper 
error from server
   
   ```
   cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2] 
Path does not exist 
'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'.
 Detail: [errno 2] No such file or directory
   ```
   
   Which translates the server is reachable and we do see the logs in server as 
well
   
   
   The flight server is written in python. Here is the below code snippet 
do_get implementation.
   
   ```
   def do_get(self, context, ticket):
       """
           Handles client requests for data. The ticket will contain:
           - access_key: S3 access key
           - secret_key: S3 secret key
           - s3_path: Full S3 path (e.g., bucket_name/object_key)
           - mode: 'batch' (for batch streaming) or 'full' (for loading the 
entire dataset)
       """
       try:
           # Parse the ticket to extract credentials, S3 path, and mode
           access_key, secret_key, s3_path, mode, batch_size = 
parse_ticket(ticket)
       except InvalidTicketFormatError as e:
           logging.error(str(e))
           raise
       except InvalidModeError as e:
           logging.error(str(e))
           raise
   
       # s3fs dont need s3a protocol.
       if s3_path.startswith("s3://"):
           s3_path = s3_path.replace("s3://", "", 1)
   
       [logging.info](http://logging.info/)(f"Cloudian S3 Override endpoint: 
{Config.S3_ENDPOINT_OVERRIDE}")
       [logging.info](http://logging.info/)(f"Cloudian S3 Region: 
{Config.S3_REGION}")
       [logging.info](http://logging.info/)(f"Fetching Parquet data from S3: 
{s3_path} in mode: {mode}")
   
       # Initialize the S3 handler with credentials
       try:
           s3_handler = S3Handler(
               endpoint=Config.S3_ENDPOINT_OVERRIDE,
               region=Config.S3_REGION,
               access_key=access_key,
               secret_key=secret_key
           )
       except Exception as e:
           logging.error(f"Error initializing S3 handler: {str(e)}")
           raise S3AccessError(f"Error initializing S3 handler: {str(e)}") from 
e
   
       if mode == DataRetrievalMode.BATCH:
           try:
               # Use the get_parquet_data method for both wildcard and 
non-wildcard cases
               parquet_data = s3_handler.get_parquet_data(s3_path)
               # parquet_data.schema:  This is used when parquet_data is an 
instance of ds.Dataset 
               # (i.e., when multiple Parquet files are being processed as a 
dataset).
               #
               # parquet_data.schema_arrow: This is used when parquet_data is 
an instance of pq (pyarrow.parquet) module.
               #  A single Parquet file has its own schema, accessible via 
schema_arrow in PyArrow
               schema = parquet_data.schema if isinstance(parquet_data, 
ds.Dataset) else parquet_data.schema_arrow
               return flight.GeneratorStream(schema, 
s3_handler.stream_parquet_batches(parquet_data, batch_size))
           except OSError as e:
               logging.error(f"AWS S3 access error: {str(e)}")
               raise S3AccessError(f"Failed to access S3: {str(e)}") from e
           except Exception as e:
               logging.error(f"Error streaming Parquet data: {str(e)}")
               raise DataProcessingError(f"Error streaming Parquet data: 
{str(e)}") from e
   
       # Handle 'full' mode to load the entire dataset
       elif mode == DataRetrievalMode.FULL:
           try:
               # Check if the S3 path contains a wildcard and the mode is FULL
               if "*" in s3_path:
                   logging.warning(
                       f"Wildcard pattern detected in S3 path '{s3_path}' with 
FULL data retrieval mode. "
                       f"This may put pressure on memory as all files will be 
loaded into memory at once."
                   )
               # Use the same get_parquet_data method for both wildcard and 
non-wildcard cases
               parquet_data = s3_handler.get_parquet_data(s3_path)
               # Load the entire dataset into memory / Chance of OOM.
               # table = parquet_data.to_table() if isinstance(parquet_data, 
ds.Dataset) else parquet_data.read_table()
               # Load the entire dataset into memory, with consideration for 
Dataset vs. ParquetFile
               if isinstance(parquet_data, ds.Dataset):
                   table = parquet_data.to_table()
               else:
                   table = parquet_data.read()
               return flight.RecordBatchStream(table)
           except OSError as e:
               logging.error(f"AWS S3 access error: {str(e)}")
               raise S3AccessError(f"Failed to access S3: {str(e)}") from e
           except Exception as e:
               logging.error(f"Error loading full Parquet dataset: {str(e)}")
               raise DataProcessingError(f"Error loading full Parquet dataset: 
{str(e)}") from e
   
       else:
           logging.error(f"Invalid mode: {DataRetrievalMode.from_string(mode)}. 
Expected 'batch' or 'full'.")
           raise InvalidModeError()
   
   
   // Helper functions.
   def get_parquet_data(self, s3_path):
       """
           Retrieves Parquet data from S3. If the path contains a wildcard 
pattern, it lists all matching files manually.
           If it's a single file, it reads the file directly.
           
           :param s3_path: The S3 path, which could be a wildcard pattern or a 
direct file path.
           :return: PyArrow Dataset object if it's a wildcard, or a ParquetFile 
object for a single file.
       """
       try:
           # Check if the path contains a wildcard
           if "*" in s3_path:
               # Split the directory and pattern (e.g., `*.parquet`)
               directory, pattern = s3_path.rsplit("/", 1)
               
               # List all files in the directory and filter using the pattern
               [logging.info](http://logging.info/)(f"Fetching Parquet files 
matching wildcard: {s3_path}")
               files = self.s3_fs.get_file_info(fs.FileSelector(directory))
               
               # Filter files matching the pattern (e.g., *.parquet) and sort 
them by modification time (mtime_ns)
               sorted_file_paths = [file.path for file in sorted(files, 
key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path, 
f"{directory}/{pattern}")]
   
               if not sorted_file_paths:
                   raise FileNotFoundError(f"No files matching pattern 
{pattern} found in {directory}")
   
               [logging.info](http://logging.info/)(f"Sorted files: 
{sorted_file_paths}")
               
               # Validate schemas across all files
               if not validate_schemas(sorted_file_paths, self.s3_fs):
                   raise ValueError("Schema mismatch detected across files.")
   
               # Create a dataset from the matching files
               dataset = ds.dataset(sorted_file_paths, format="parquet", 
filesystem=self.s3_fs)
               return dataset
           else:
               # Handle single file case: read the specific Parquet file
               [logging.info](http://logging.info/)(f"Fetching single Parquet 
file: {s3_path}")
               parquet_file = 
pq.ParquetFile(self.s3_fs.open_input_file(s3_path))
               return parquet_file
       except Exception as e:
           logging.error(f"Error fetching Parquet data from S3: {e}")
           raise e
   
   @staticmethod
   def stream_parquet_batches(parquet_data, batch_size=None):
       """
           Stream the Parquet data in batches. Supports both datasets (multiple 
files) and single Parquet files.
   
           :param parquet_data: The Dataset or ParquetFile object to stream 
data from.
           :param batch_size: The size of the batches to stream. Default is 
100,000 if not provided.
           :return: Generator for streaming Parquet batches.
       """
       try:
           # Ensure batch_size is an integer, set default if None
           if batch_size is None or not isinstance(batch_size, int):
               batch_size = 100000
   
           if isinstance(parquet_data, ds.Dataset):
               # If it's a dataset (multiple files), stream dataset batches 
using `int_batch_size`
               [logging.info](http://logging.info/)(f"Streaming Parquet data in 
batches from a dataset")
               for batch in parquet_data.to_batches(batch_size=batch_size):
                   yield batch
           else:
               # If it's a single file (ParquetFile), stream file batches 
(iter_batches)
               [logging.info](http://logging.info/)(f"Streaming Parquet data in 
batches from a single file")
               for batch in parquet_data.iter_batches(batch_size=batch_size):
                   yield batch
       except Exception as e:
           logging.error(f"Error streaming Parquet batches: {e}")
           raise e
   ```
   
   **Few important Findings**
   
   - On @lidavidm question below
   
   _"Do you know if (1) the python server thinks the RPC is complete, (2) the 
Java client got any (or all) of the data before getting stuck? It may also be 
interesting to step though the Java code with a debugger attached, and see what 
the values of `pending` and `completed` are in the FlightStream instance, and 
if the methods here[1] are all being hit as expected."_
   
   [1] 
https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356
   
   1.  The python server thinks the RPC is complete: Yes
   2. the Java client got any (or all) of the data before getting stuck? 
Depends, if we are reading in batches of 1000 which is pretty low number we see 
data being streamed till 15000-18000 rows ultimately stuck, if we select a 
batch size of 100K client is stuck indefinitely
   3. Sharing the debug screenshot , the method was invoked as expected
   
   <img width="1214" alt="Image" 
src="https://github.com/user-attachments/assets/509108e4-ae53-4485-ac2d-86da1ccdea47";
 />
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to