Susmit07 opened a new issue, #753: URL: https://github.com/apache/arrow-java/issues/753
### Describe the bug, including details regarding any error messages, version, and platform. Hi Team, We are using this below code snippet in Scala to query the flight server, but seems to be stuck indefinitely, this issue is seen when we are testing from our local workstation (Mac to be precise) Another interesting thing, it's able to propagate the error message correctly but not the FlightStream data, the same code works fine when we run inside a linux VM. Do you folks see any issue in the code? ``` def fetchDataStreamIterator(details: BaseDataAccessDetails): Iterator[FlightStream] = { logger.info(s"Fetching data for details: ${details.toString}") val ticketStr = buildTicketStr(details) logger.info(s"Generated ticket string: $ticketStr") val allocator = new RootAllocator(Long.MaxValue) val client = FlightClient.builder(allocator, Location.forGrpcInsecure(serverHost, serverPort)).build() try { val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) val stream = client.getStream(ticket) Iterator.continually { if (stream.next()) Some(stream) else { // Cleanup when no more batches close(stream, client) None } }.takeWhile(_.isDefined).flatten } catch { case e: FlightRuntimeException => logger.error(s"Error communicating with Flight server: ${e.getMessage}") throw new CefFlightServerException(s"Error communicating with Flight server: ${e.getMessage}", e) case e: Exception => logger.error(s"Failed to fetch data: ${e.getMessage}") throw new CefFlightServerException("Failed to fetch data from Flight Server", e) } } ``` Attaching the client thread dump for reference [threads_report.txt](https://github.com/user-attachments/files/20182747/threads_report.txt) Strangely if we pass a dummy non existent s3 path we are getting proper error from server ``` cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2] Path does not exist 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'. Detail: [errno 2] No such file or directory ``` Which translates the server is reachable and we do see the logs in server as well The flight server is written in python. Here is the below code snippet do_get implementation. ``` def do_get(self, context, ticket): """ Handles client requests for data. The ticket will contain: - access_key: S3 access key - secret_key: S3 secret key - s3_path: Full S3 path (e.g., bucket_name/object_key) - mode: 'batch' (for batch streaming) or 'full' (for loading the entire dataset) """ try: # Parse the ticket to extract credentials, S3 path, and mode access_key, secret_key, s3_path, mode, batch_size = parse_ticket(ticket) except InvalidTicketFormatError as e: logging.error(str(e)) raise except InvalidModeError as e: logging.error(str(e)) raise # s3fs dont need s3a protocol. if s3_path.startswith("s3://"): s3_path = s3_path.replace("s3://", "", 1) [logging.info](http://logging.info/)(f"Cloudian S3 Override endpoint: {Config.S3_ENDPOINT_OVERRIDE}") [logging.info](http://logging.info/)(f"Cloudian S3 Region: {Config.S3_REGION}") [logging.info](http://logging.info/)(f"Fetching Parquet data from S3: {s3_path} in mode: {mode}") # Initialize the S3 handler with credentials try: s3_handler = S3Handler( endpoint=Config.S3_ENDPOINT_OVERRIDE, region=Config.S3_REGION, access_key=access_key, secret_key=secret_key ) except Exception as e: logging.error(f"Error initializing S3 handler: {str(e)}") raise S3AccessError(f"Error initializing S3 handler: {str(e)}") from e if mode == DataRetrievalMode.BATCH: try: # Use the get_parquet_data method for both wildcard and non-wildcard cases parquet_data = s3_handler.get_parquet_data(s3_path) # parquet_data.schema: This is used when parquet_data is an instance of ds.Dataset # (i.e., when multiple Parquet files are being processed as a dataset). # # parquet_data.schema_arrow: This is used when parquet_data is an instance of pq (pyarrow.parquet) module. # A single Parquet file has its own schema, accessible via schema_arrow in PyArrow schema = parquet_data.schema if isinstance(parquet_data, ds.Dataset) else parquet_data.schema_arrow return flight.GeneratorStream(schema, s3_handler.stream_parquet_batches(parquet_data, batch_size)) except OSError as e: logging.error(f"AWS S3 access error: {str(e)}") raise S3AccessError(f"Failed to access S3: {str(e)}") from e except Exception as e: logging.error(f"Error streaming Parquet data: {str(e)}") raise DataProcessingError(f"Error streaming Parquet data: {str(e)}") from e # Handle 'full' mode to load the entire dataset elif mode == DataRetrievalMode.FULL: try: # Check if the S3 path contains a wildcard and the mode is FULL if "*" in s3_path: logging.warning( f"Wildcard pattern detected in S3 path '{s3_path}' with FULL data retrieval mode. " f"This may put pressure on memory as all files will be loaded into memory at once." ) # Use the same get_parquet_data method for both wildcard and non-wildcard cases parquet_data = s3_handler.get_parquet_data(s3_path) # Load the entire dataset into memory / Chance of OOM. # table = parquet_data.to_table() if isinstance(parquet_data, ds.Dataset) else parquet_data.read_table() # Load the entire dataset into memory, with consideration for Dataset vs. ParquetFile if isinstance(parquet_data, ds.Dataset): table = parquet_data.to_table() else: table = parquet_data.read() return flight.RecordBatchStream(table) except OSError as e: logging.error(f"AWS S3 access error: {str(e)}") raise S3AccessError(f"Failed to access S3: {str(e)}") from e except Exception as e: logging.error(f"Error loading full Parquet dataset: {str(e)}") raise DataProcessingError(f"Error loading full Parquet dataset: {str(e)}") from e else: logging.error(f"Invalid mode: {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.") raise InvalidModeError() // Helper functions. def get_parquet_data(self, s3_path): """ Retrieves Parquet data from S3. If the path contains a wildcard pattern, it lists all matching files manually. If it's a single file, it reads the file directly. :param s3_path: The S3 path, which could be a wildcard pattern or a direct file path. :return: PyArrow Dataset object if it's a wildcard, or a ParquetFile object for a single file. """ try: # Check if the path contains a wildcard if "*" in s3_path: # Split the directory and pattern (e.g., `*.parquet`) directory, pattern = s3_path.rsplit("/", 1) # List all files in the directory and filter using the pattern [logging.info](http://logging.info/)(f"Fetching Parquet files matching wildcard: {s3_path}") files = self.s3_fs.get_file_info(fs.FileSelector(directory)) # Filter files matching the pattern (e.g., *.parquet) and sort them by modification time (mtime_ns) sorted_file_paths = [file.path for file in sorted(files, key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path, f"{directory}/{pattern}")] if not sorted_file_paths: raise FileNotFoundError(f"No files matching pattern {pattern} found in {directory}") [logging.info](http://logging.info/)(f"Sorted files: {sorted_file_paths}") # Validate schemas across all files if not validate_schemas(sorted_file_paths, self.s3_fs): raise ValueError("Schema mismatch detected across files.") # Create a dataset from the matching files dataset = ds.dataset(sorted_file_paths, format="parquet", filesystem=self.s3_fs) return dataset else: # Handle single file case: read the specific Parquet file [logging.info](http://logging.info/)(f"Fetching single Parquet file: {s3_path}") parquet_file = pq.ParquetFile(self.s3_fs.open_input_file(s3_path)) return parquet_file except Exception as e: logging.error(f"Error fetching Parquet data from S3: {e}") raise e @staticmethod def stream_parquet_batches(parquet_data, batch_size=None): """ Stream the Parquet data in batches. Supports both datasets (multiple files) and single Parquet files. :param parquet_data: The Dataset or ParquetFile object to stream data from. :param batch_size: The size of the batches to stream. Default is 100,000 if not provided. :return: Generator for streaming Parquet batches. """ try: # Ensure batch_size is an integer, set default if None if batch_size is None or not isinstance(batch_size, int): batch_size = 100000 if isinstance(parquet_data, ds.Dataset): # If it's a dataset (multiple files), stream dataset batches using `int_batch_size` [logging.info](http://logging.info/)(f"Streaming Parquet data in batches from a dataset") for batch in parquet_data.to_batches(batch_size=batch_size): yield batch else: # If it's a single file (ParquetFile), stream file batches (iter_batches) [logging.info](http://logging.info/)(f"Streaming Parquet data in batches from a single file") for batch in parquet_data.iter_batches(batch_size=batch_size): yield batch except Exception as e: logging.error(f"Error streaming Parquet batches: {e}") raise e ``` **Few important Findings** - On @lidavidm question below _"Do you know if (1) the python server thinks the RPC is complete, (2) the Java client got any (or all) of the data before getting stuck? It may also be interesting to step though the Java code with a debugger attached, and see what the values of `pending` and `completed` are in the FlightStream instance, and if the methods here[1] are all being hit as expected."_ [1] https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356 1. The python server thinks the RPC is complete: Yes 2. the Java client got any (or all) of the data before getting stuck? Depends, if we are reading in batches of 1000 which is pretty low number we see data being streamed till 15000-18000 rows ultimately stuck, if we select a batch size of 100K client is stuck indefinitely 3. Sharing the debug screenshot , the method was invoked as expected <img width="1214" alt="Image" src="https://github.com/user-attachments/assets/509108e4-ae53-4485-ac2d-86da1ccdea47" /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org