martin-g commented on code in PR #1527:
URL: 
https://github.com/apache/datafusion-ballista/pull/1527#discussion_r3031851810


##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -92,8 +94,21 @@ impl ShuffleReaderExec {
             partition,
             metrics: ExecutionPlanMetricsSet::new(),
             properties,
+            work_dir: "".to_string(), // to be updated at the executor side

Review Comment:
   IMO it would be safer to make it an `Option<String>` with `None` as default.
   This way if it is not updated later to `Some` you can fail early.
   Currently there is a chance silently to use the current working directory if 
it is not updated for any reason.



##########
ballista/executor/src/flight_service.rs:
##########
@@ -251,14 +259,34 @@ impl FlightService for BallistaFlightService {
                     decode_protobuf(&action.body).map_err(|e| 
from_ballista_err(&e))?;
 
                 match &action {
-                    BallistaAction::FetchPartition { path, .. } => {
-                        debug!("FetchPartition reading {path}");
-                        let data_path = Path::new(path);
+                    BallistaAction::FetchPartition {
+                        job_id,
+                        stage_id,
+                        partition_id,
+                        file_id,
+                        is_sort_shuffle,
+                        ..
+                    } => {
+                        let path = create_shuffle_path(
+                            &self.work_dir,
+                            job_id,
+                            *stage_id,
+                            *partition_id,
+                            *file_id,
+                            *is_sort_shuffle,
+                        )
+                        .map_err(|e| {
+                            Status::internal(format!(
+                                "I/O error, cant create shuffle path: {e}"
+                            ))
+                        })?;
+
+                        debug!("FetchPartition reading {path:?}");
 
                         // Block transport doesn't support sort-based shuffle 
because it
                         // transfers the entire file, which contains all 
partitions.
                         // Use flight transport (do_get) for sort-based 
shuffle.
-                        if is_sort_shuffle_output(data_path) {
+                        if is_sort_shuffle_output(&path) {

Review Comment:
   shouldn't this use `is_sort_shuffle` now ?



##########
ballista/executor/src/flight_service.rs:
##########
@@ -136,10 +144,10 @@ impl FlightService for BallistaFlightService {
                 }
 
                 // Standard hash-based shuffle - read the entire file
-                let file = File::open(path)
+                let file = File::open(path.clone())

Review Comment:
   ```suggestion
                   let file = File::open(&path)
   ```



##########
ballista/core/src/serde/scheduler/mod.rs:
##########
@@ -86,8 +92,24 @@ pub struct PartitionLocation {
     pub executor_meta: ExecutorMetadata,
     /// Statistics about the partition data.
     pub partition_stats: PartitionStats,
-    /// File path to the partition data.
-    pub path: String,
+    /// shuffle file id
+    pub file_id: Option<u64>,
+    /// is shuffle partition sort partition
+    pub is_sort_shuffle: bool,
+}
+
+impl PartitionLocation {
+    /// creates file actuall file location

Review Comment:
   ```suggestion
       /// creates file actual file location
   ```



##########
ballista/core/src/execution_plans/mod.rs:
##########
@@ -25,10 +25,122 @@ mod shuffle_writer_trait;
 pub mod sort_shuffle;
 mod unresolved_shuffle;
 
+use std::path::{Path, PathBuf};
+
+use datafusion::common::exec_err;
 pub use distributed_query::DistributedQueryExec;
 pub use shuffle_reader::ShuffleReaderExec;
 pub use shuffle_reader::{stats_for_partition, stats_for_partitions};
 pub use shuffle_writer::ShuffleWriterExec;
 pub use shuffle_writer_trait::ShuffleWriter;
 pub use sort_shuffle::SortShuffleWriterExec;
 pub use unresolved_shuffle::UnresolvedShuffleExec;
+
+/// Creates the file path for a shuffle output partition.
+///
+/// The path structure depends on the shuffle type:
+///
+/// - **Hash shuffle** (`is_sort_shuffle = false`): produces one directory per 
output
+///   partition; `partition_id` is always part of the path. `file_id` is an 
optional
+///   sequence number used when a single partition is written in multiple 
files:
+///   - With `file_id`: 
`{work_dir}/{job_id}/{stage_id}/{partition_id}/data-{file_id}.arrow`
+///   - Without `file_id`: 
`{work_dir}/{job_id}/{stage_id}/{partition_id}/data.arrow`
+///
+/// - **Sort shuffle** (`is_sort_shuffle = true`): produces a single output 
partition,
+///   so `partition_id` is **ignored** and not included in the path. `file_id` 
acts as a
+///   file sequence counter and is **required**:
+///   - With `file_id`: `{work_dir}/{job_id}/{stage_id}/{file_id}/data.arrow`
+///   - Without `file_id`: returns an error
+///
+/// # Arguments
+///
+/// - `work_dir` — base directory where shuffle files are written
+/// - `job_id` — unique identifier for the job
+/// - `stage_id` — stage within the job that produced this shuffle output
+/// - `partition_id` — output partition index; used by hash shuffle only, 
ignored for sort shuffle
+/// - `file_id` — file sequence number; optional for hash shuffle, required 
for sort shuffle
+/// - `is_sort_shuffle` — selects between sort-shuffle and hash-shuffle path 
layout
+pub fn create_shuffle_path<P: AsRef<Path>>(
+    work_dir: P,
+    job_id: &str,
+    stage_id: usize,
+    partition_id: usize,
+    file_id: Option<u64>,
+    is_sort_shuffle: bool,
+) -> datafusion::error::Result<PathBuf> {
+    let mut path = PathBuf::new();
+
+    path.push(work_dir);
+    path.push(job_id);
+    path.push(stage_id.to_string());
+
+    match (file_id, is_sort_shuffle) {
+        (Some(file_id), false) => {
+            path.push(partition_id.to_string());
+            path.push(format!("data-{}.arrow", file_id));
+        }
+        (Some(file_id), true) => {
+            path.push(file_id.to_string());
+            path.push("data.arrow");
+        }
+        (None, false) => {
+            path.push(partition_id.to_string());
+            path.push("data.arrow");
+        }
+        (None, true) => {
+            exec_err!("cant create path for sort shuffle without file_id 
provided")?

Review Comment:
   ```suggestion
               exec_err!("can't create path for sort shuffle without file_id 
provided")?
   ```



##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -414,11 +433,11 @@ fn send_fetch_partitions(
     //
     // fetching local partitions (read from file)
     //
-
+    let work_dir = work_dir.to_string();
     spawned_tasks.push(SpawnedTask::spawn_blocking({
         move || {
             for p in local_locations {
-                let r = fetch_partition_local(&p, sort_shuffle_enabled);
+                let r = fetch_partition_local(work_dir.clone(), &p, 
sort_shuffle_enabled);

Review Comment:
   ```suggestion
                   let r = fetch_partition_local(&work_dir, &p, 
sort_shuffle_enabled);
   ```



##########
ballista/executor/src/flight_service.rs:
##########
@@ -251,14 +259,34 @@ impl FlightService for BallistaFlightService {
                     decode_protobuf(&action.body).map_err(|e| 
from_ballista_err(&e))?;
 
                 match &action {
-                    BallistaAction::FetchPartition { path, .. } => {
-                        debug!("FetchPartition reading {path}");
-                        let data_path = Path::new(path);
+                    BallistaAction::FetchPartition {
+                        job_id,
+                        stage_id,
+                        partition_id,
+                        file_id,
+                        is_sort_shuffle,
+                        ..
+                    } => {
+                        let path = create_shuffle_path(
+                            &self.work_dir,
+                            job_id,
+                            *stage_id,
+                            *partition_id,
+                            *file_id,
+                            *is_sort_shuffle,
+                        )
+                        .map_err(|e| {
+                            Status::internal(format!(
+                                "I/O error, cant create shuffle path: {e}"

Review Comment:
   ```suggestion
                                   "I/O error, can't create shuffle path: {e}"
   ```



##########
ballista/executor/src/flight_service.rs:
##########
@@ -100,21 +96,33 @@ impl FlightService for BallistaFlightService {
 
         match &action {
             BallistaAction::FetchPartition {
-                path, partition_id, ..
+                job_id,
+                stage_id,
+                partition_id,
+                file_id,
+                is_sort_shuffle,
+                ..
             } => {
-                debug!("FetchPartition reading partition {partition_id} from 
{path}");
-                let data_path = Path::new(path);
+                let path = create_shuffle_path(
+                    &self.work_dir,
+                    job_id,
+                    *stage_id,
+                    *partition_id,
+                    *file_id,
+                    *is_sort_shuffle,
+                )
+                .map_err(|e| {
+                    Status::internal(format!("I/O error, cant create shuffle 
path: {e}"))

Review Comment:
   ```suggestion
                       Status::internal(format!("I/O error, can't create 
shuffle path: {e}"))
   ```



##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########


Review Comment:
   Should this use `location.is_sort_shuffle` ?



##########
ballista/core/src/serde/scheduler/mod.rs:
##########
@@ -44,12 +46,16 @@ pub enum Action {
         stage_id: usize,
         /// The partition identifier within the stage.
         partition_id: usize,
-        /// File path to the partition data.
-        path: String,
+        // /// File path to the partition data.
+        // path: String,
         /// Hostname or IP address of the executor hosting this partition.
         host: String,
         /// Port number for data transfer.
         port: u16,
+        /// shuffle file block id
+        file_id: Option<u64>,
+        /// is shuffle partition sort partition

Review Comment:
   ```suggestion
           /// whether this partition uses sort shuffle
   ```



##########
ballista/core/src/serde/scheduler/mod.rs:
##########
@@ -86,8 +92,24 @@ pub struct PartitionLocation {
     pub executor_meta: ExecutorMetadata,
     /// Statistics about the partition data.
     pub partition_stats: PartitionStats,
-    /// File path to the partition data.
-    pub path: String,
+    /// shuffle file id
+    pub file_id: Option<u64>,
+    /// is shuffle partition sort partition

Review Comment:
   ```suggestion
       /// whether this partition uses sort shuffle
   ```



##########
ballista/core/proto/ballista.proto:
##########
@@ -230,9 +230,14 @@ message FetchPartition {
   string job_id = 1;
   uint32 stage_id = 2;
   uint32 partition_id = 3;
-  string path = 4;
   string host = 5;
   uint32 port = 6;
+  optional uint64 file_id = 7;
+  bool is_sort_shuffle = 8 ;

Review Comment:
   ```suggestion
     bool is_sort_shuffle = 8;
   ```



##########
ballista/core/proto/ballista.proto:
##########
@@ -230,9 +230,14 @@ message FetchPartition {
   string job_id = 1;
   uint32 stage_id = 2;
   uint32 partition_id = 3;
-  string path = 4;
   string host = 5;
   uint32 port = 6;
+  optional uint64 file_id = 7;
+  bool is_sort_shuffle = 8 ;
+  
+  // reserved after removing deprecated `path`.
+  reserved 4;
+  reserved "path"; 

Review Comment:
   ```suggestion
     reserved "path";
   ```



##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -508,15 +525,22 @@ async fn fetch_partition_remote(
             })?;
 
     ballista_client
-        .fetch_partition(&metadata.id, partition_id, &location.path, 
prefer_flight)
+        .fetch_partition(
+            &metadata.id,
+            partition_id,
+            file_id,
+            is_sort_shuffle,
+            prefer_flight,
+        )
         .await
 }
 
 fn fetch_partition_local(
+    work_dir: String,
     location: &PartitionLocation,
     sort_shuffle_enabled: bool,
 ) -> result::Result<SendableRecordBatchStream, BallistaError> {
-    let path = &location.path;
+    let path = &location.path(&work_dir)?;

Review Comment:
   ```suggestion
       let path = &location.path(work_dir)?;
   ```



##########
ballista/core/src/serde/scheduler/mod.rs:
##########
@@ -44,12 +46,16 @@ pub enum Action {
         stage_id: usize,
         /// The partition identifier within the stage.
         partition_id: usize,
-        /// File path to the partition data.
-        path: String,
+        // /// File path to the partition data.
+        // path: String,

Review Comment:
   ```suggestion
   ```



##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -1022,21 +1051,34 @@ mod tests {
             RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(data_array)])
                 .unwrap();
         let tmp_dir = tempdir().unwrap();
-        let file_path = tmp_dir.path().join("shuffle_data");
+        let work_dir = tmp_dir.path();
+
+        // job name and stage id are hard-codded

Review Comment:
   ```suggestion
           // job name and stage id are hard-coded
   ```



##########
ballista/core/src/client.rs:
##########
@@ -146,22 +149,26 @@ impl BallistaClient {
     /// The block-based transfer is optimized for performance and reduces 
computational overhead on the server.
     ///
     /// This method should be used if the request may be proxied.
+    #[allow(clippy::too_many_arguments)]
     pub async fn fetch_partition_proxied(
         &mut self,
         executor_id: &str,
         partition_id: &PartitionId,
+        file_id: Option<u64>,
+        is_sort_shuffle: bool,
         host: &str,
         port: u16,
-        path: &str,
         flight_transport: bool,
     ) -> BResult<SendableRecordBatchStream> {
         let action = Action::FetchPartition {
             job_id: partition_id.job_id.clone(),
             stage_id: partition_id.stage_id,
             partition_id: partition_id.partition_id,
-            path: path.to_owned(),
+            // path: path.to_owned(),

Review Comment:
   ```suggestion
   ```



##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -508,15 +525,22 @@ async fn fetch_partition_remote(
             })?;
 
     ballista_client
-        .fetch_partition(&metadata.id, partition_id, &location.path, 
prefer_flight)
+        .fetch_partition(
+            &metadata.id,
+            partition_id,
+            file_id,
+            is_sort_shuffle,
+            prefer_flight,
+        )
         .await
 }
 
 fn fetch_partition_local(
+    work_dir: String,

Review Comment:
   ```suggestion
       work_dir: &str,
   ```



##########
ballista/core/src/execution_plans/sort_shuffle/writer.rs:
##########
@@ -312,10 +313,12 @@ impl SortShuffleWriterExec {
                 if num_rows > 0 {
                     results.push(ShuffleWritePartition {
                         partition_id: part_id as u64,
-                        path: data_path.to_string_lossy().to_string(),
+                        //path: data_path.to_string_lossy().to_string(),

Review Comment:
   ```suggestion
   ```



##########
ballista/executor/src/executor_process.rs:
##########
@@ -442,11 +443,12 @@ pub async fn start_executor_process(
     };
     let shutdown = shutdown_notification.subscribe_for_shutdown();
     let override_flight = opt.override_arrow_flight_service.clone();
-
+    //let wd = work_dir.clone();

Review Comment:
   ```suggestion
   ```



##########
ballista/executor/src/lib.rs:
##########
@@ -65,12 +65,14 @@ use ballista_core::utils::GrpcServerConfig;
 /// [ArrowFlightServerProvider] provides a function which creates a new Arrow 
Flight server.
 ///
 /// The function should take two arguments:

Review Comment:
   ```suggestion
   /// The function should take four arguments:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to