martin-g commented on code in PR #1527:
URL:
https://github.com/apache/datafusion-ballista/pull/1527#discussion_r3031851810
##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -92,8 +94,21 @@ impl ShuffleReaderExec {
partition,
metrics: ExecutionPlanMetricsSet::new(),
properties,
+ work_dir: "".to_string(), // to be updated at the executor side
Review Comment:
IMO it would be safer to make it an `Option<String>` with `None` as default.
This way if it is not updated later to `Some` you can fail early.
Currently there is a chance silently to use the current working directory if
it is not updated for any reason.
##########
ballista/executor/src/flight_service.rs:
##########
@@ -251,14 +259,34 @@ impl FlightService for BallistaFlightService {
decode_protobuf(&action.body).map_err(|e|
from_ballista_err(&e))?;
match &action {
- BallistaAction::FetchPartition { path, .. } => {
- debug!("FetchPartition reading {path}");
- let data_path = Path::new(path);
+ BallistaAction::FetchPartition {
+ job_id,
+ stage_id,
+ partition_id,
+ file_id,
+ is_sort_shuffle,
+ ..
+ } => {
+ let path = create_shuffle_path(
+ &self.work_dir,
+ job_id,
+ *stage_id,
+ *partition_id,
+ *file_id,
+ *is_sort_shuffle,
+ )
+ .map_err(|e| {
+ Status::internal(format!(
+ "I/O error, cant create shuffle path: {e}"
+ ))
+ })?;
+
+ debug!("FetchPartition reading {path:?}");
// Block transport doesn't support sort-based shuffle
because it
// transfers the entire file, which contains all
partitions.
// Use flight transport (do_get) for sort-based
shuffle.
- if is_sort_shuffle_output(data_path) {
+ if is_sort_shuffle_output(&path) {
Review Comment:
shouldn't this use `is_sort_shuffle` now ?
##########
ballista/executor/src/flight_service.rs:
##########
@@ -136,10 +144,10 @@ impl FlightService for BallistaFlightService {
}
// Standard hash-based shuffle - read the entire file
- let file = File::open(path)
+ let file = File::open(path.clone())
Review Comment:
```suggestion
let file = File::open(&path)
```
##########
ballista/core/src/serde/scheduler/mod.rs:
##########
@@ -86,8 +92,24 @@ pub struct PartitionLocation {
pub executor_meta: ExecutorMetadata,
/// Statistics about the partition data.
pub partition_stats: PartitionStats,
- /// File path to the partition data.
- pub path: String,
+ /// shuffle file id
+ pub file_id: Option<u64>,
+ /// is shuffle partition sort partition
+ pub is_sort_shuffle: bool,
+}
+
+impl PartitionLocation {
+ /// creates file actuall file location
Review Comment:
```suggestion
/// creates file actual file location
```
##########
ballista/core/src/execution_plans/mod.rs:
##########
@@ -25,10 +25,122 @@ mod shuffle_writer_trait;
pub mod sort_shuffle;
mod unresolved_shuffle;
+use std::path::{Path, PathBuf};
+
+use datafusion::common::exec_err;
pub use distributed_query::DistributedQueryExec;
pub use shuffle_reader::ShuffleReaderExec;
pub use shuffle_reader::{stats_for_partition, stats_for_partitions};
pub use shuffle_writer::ShuffleWriterExec;
pub use shuffle_writer_trait::ShuffleWriter;
pub use sort_shuffle::SortShuffleWriterExec;
pub use unresolved_shuffle::UnresolvedShuffleExec;
+
+/// Creates the file path for a shuffle output partition.
+///
+/// The path structure depends on the shuffle type:
+///
+/// - **Hash shuffle** (`is_sort_shuffle = false`): produces one directory per
output
+/// partition; `partition_id` is always part of the path. `file_id` is an
optional
+/// sequence number used when a single partition is written in multiple
files:
+/// - With `file_id`:
`{work_dir}/{job_id}/{stage_id}/{partition_id}/data-{file_id}.arrow`
+/// - Without `file_id`:
`{work_dir}/{job_id}/{stage_id}/{partition_id}/data.arrow`
+///
+/// - **Sort shuffle** (`is_sort_shuffle = true`): produces a single output
partition,
+/// so `partition_id` is **ignored** and not included in the path. `file_id`
acts as a
+/// file sequence counter and is **required**:
+/// - With `file_id`: `{work_dir}/{job_id}/{stage_id}/{file_id}/data.arrow`
+/// - Without `file_id`: returns an error
+///
+/// # Arguments
+///
+/// - `work_dir` — base directory where shuffle files are written
+/// - `job_id` — unique identifier for the job
+/// - `stage_id` — stage within the job that produced this shuffle output
+/// - `partition_id` — output partition index; used by hash shuffle only,
ignored for sort shuffle
+/// - `file_id` — file sequence number; optional for hash shuffle, required
for sort shuffle
+/// - `is_sort_shuffle` — selects between sort-shuffle and hash-shuffle path
layout
+pub fn create_shuffle_path<P: AsRef<Path>>(
+ work_dir: P,
+ job_id: &str,
+ stage_id: usize,
+ partition_id: usize,
+ file_id: Option<u64>,
+ is_sort_shuffle: bool,
+) -> datafusion::error::Result<PathBuf> {
+ let mut path = PathBuf::new();
+
+ path.push(work_dir);
+ path.push(job_id);
+ path.push(stage_id.to_string());
+
+ match (file_id, is_sort_shuffle) {
+ (Some(file_id), false) => {
+ path.push(partition_id.to_string());
+ path.push(format!("data-{}.arrow", file_id));
+ }
+ (Some(file_id), true) => {
+ path.push(file_id.to_string());
+ path.push("data.arrow");
+ }
+ (None, false) => {
+ path.push(partition_id.to_string());
+ path.push("data.arrow");
+ }
+ (None, true) => {
+ exec_err!("cant create path for sort shuffle without file_id
provided")?
Review Comment:
```suggestion
exec_err!("can't create path for sort shuffle without file_id
provided")?
```
##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -414,11 +433,11 @@ fn send_fetch_partitions(
//
// fetching local partitions (read from file)
//
-
+ let work_dir = work_dir.to_string();
spawned_tasks.push(SpawnedTask::spawn_blocking({
move || {
for p in local_locations {
- let r = fetch_partition_local(&p, sort_shuffle_enabled);
+ let r = fetch_partition_local(work_dir.clone(), &p,
sort_shuffle_enabled);
Review Comment:
```suggestion
let r = fetch_partition_local(&work_dir, &p,
sort_shuffle_enabled);
```
##########
ballista/executor/src/flight_service.rs:
##########
@@ -251,14 +259,34 @@ impl FlightService for BallistaFlightService {
decode_protobuf(&action.body).map_err(|e|
from_ballista_err(&e))?;
match &action {
- BallistaAction::FetchPartition { path, .. } => {
- debug!("FetchPartition reading {path}");
- let data_path = Path::new(path);
+ BallistaAction::FetchPartition {
+ job_id,
+ stage_id,
+ partition_id,
+ file_id,
+ is_sort_shuffle,
+ ..
+ } => {
+ let path = create_shuffle_path(
+ &self.work_dir,
+ job_id,
+ *stage_id,
+ *partition_id,
+ *file_id,
+ *is_sort_shuffle,
+ )
+ .map_err(|e| {
+ Status::internal(format!(
+ "I/O error, cant create shuffle path: {e}"
Review Comment:
```suggestion
"I/O error, can't create shuffle path: {e}"
```
##########
ballista/executor/src/flight_service.rs:
##########
@@ -100,21 +96,33 @@ impl FlightService for BallistaFlightService {
match &action {
BallistaAction::FetchPartition {
- path, partition_id, ..
+ job_id,
+ stage_id,
+ partition_id,
+ file_id,
+ is_sort_shuffle,
+ ..
} => {
- debug!("FetchPartition reading partition {partition_id} from
{path}");
- let data_path = Path::new(path);
+ let path = create_shuffle_path(
+ &self.work_dir,
+ job_id,
+ *stage_id,
+ *partition_id,
+ *file_id,
+ *is_sort_shuffle,
+ )
+ .map_err(|e| {
+ Status::internal(format!("I/O error, cant create shuffle
path: {e}"))
Review Comment:
```suggestion
Status::internal(format!("I/O error, can't create
shuffle path: {e}"))
```
##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
Review Comment:
Should this use `location.is_sort_shuffle` ?
##########
ballista/core/src/serde/scheduler/mod.rs:
##########
@@ -44,12 +46,16 @@ pub enum Action {
stage_id: usize,
/// The partition identifier within the stage.
partition_id: usize,
- /// File path to the partition data.
- path: String,
+ // /// File path to the partition data.
+ // path: String,
/// Hostname or IP address of the executor hosting this partition.
host: String,
/// Port number for data transfer.
port: u16,
+ /// shuffle file block id
+ file_id: Option<u64>,
+ /// is shuffle partition sort partition
Review Comment:
```suggestion
/// whether this partition uses sort shuffle
```
##########
ballista/core/src/serde/scheduler/mod.rs:
##########
@@ -86,8 +92,24 @@ pub struct PartitionLocation {
pub executor_meta: ExecutorMetadata,
/// Statistics about the partition data.
pub partition_stats: PartitionStats,
- /// File path to the partition data.
- pub path: String,
+ /// shuffle file id
+ pub file_id: Option<u64>,
+ /// is shuffle partition sort partition
Review Comment:
```suggestion
/// whether this partition uses sort shuffle
```
##########
ballista/core/proto/ballista.proto:
##########
@@ -230,9 +230,14 @@ message FetchPartition {
string job_id = 1;
uint32 stage_id = 2;
uint32 partition_id = 3;
- string path = 4;
string host = 5;
uint32 port = 6;
+ optional uint64 file_id = 7;
+ bool is_sort_shuffle = 8 ;
Review Comment:
```suggestion
bool is_sort_shuffle = 8;
```
##########
ballista/core/proto/ballista.proto:
##########
@@ -230,9 +230,14 @@ message FetchPartition {
string job_id = 1;
uint32 stage_id = 2;
uint32 partition_id = 3;
- string path = 4;
string host = 5;
uint32 port = 6;
+ optional uint64 file_id = 7;
+ bool is_sort_shuffle = 8 ;
+
+ // reserved after removing deprecated `path`.
+ reserved 4;
+ reserved "path";
Review Comment:
```suggestion
reserved "path";
```
##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -508,15 +525,22 @@ async fn fetch_partition_remote(
})?;
ballista_client
- .fetch_partition(&metadata.id, partition_id, &location.path,
prefer_flight)
+ .fetch_partition(
+ &metadata.id,
+ partition_id,
+ file_id,
+ is_sort_shuffle,
+ prefer_flight,
+ )
.await
}
fn fetch_partition_local(
+ work_dir: String,
location: &PartitionLocation,
sort_shuffle_enabled: bool,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
- let path = &location.path;
+ let path = &location.path(&work_dir)?;
Review Comment:
```suggestion
let path = &location.path(work_dir)?;
```
##########
ballista/core/src/serde/scheduler/mod.rs:
##########
@@ -44,12 +46,16 @@ pub enum Action {
stage_id: usize,
/// The partition identifier within the stage.
partition_id: usize,
- /// File path to the partition data.
- path: String,
+ // /// File path to the partition data.
+ // path: String,
Review Comment:
```suggestion
```
##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -1022,21 +1051,34 @@ mod tests {
RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(data_array)])
.unwrap();
let tmp_dir = tempdir().unwrap();
- let file_path = tmp_dir.path().join("shuffle_data");
+ let work_dir = tmp_dir.path();
+
+ // job name and stage id are hard-codded
Review Comment:
```suggestion
// job name and stage id are hard-coded
```
##########
ballista/core/src/client.rs:
##########
@@ -146,22 +149,26 @@ impl BallistaClient {
/// The block-based transfer is optimized for performance and reduces
computational overhead on the server.
///
/// This method should be used if the request may be proxied.
+ #[allow(clippy::too_many_arguments)]
pub async fn fetch_partition_proxied(
&mut self,
executor_id: &str,
partition_id: &PartitionId,
+ file_id: Option<u64>,
+ is_sort_shuffle: bool,
host: &str,
port: u16,
- path: &str,
flight_transport: bool,
) -> BResult<SendableRecordBatchStream> {
let action = Action::FetchPartition {
job_id: partition_id.job_id.clone(),
stage_id: partition_id.stage_id,
partition_id: partition_id.partition_id,
- path: path.to_owned(),
+ // path: path.to_owned(),
Review Comment:
```suggestion
```
##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -508,15 +525,22 @@ async fn fetch_partition_remote(
})?;
ballista_client
- .fetch_partition(&metadata.id, partition_id, &location.path,
prefer_flight)
+ .fetch_partition(
+ &metadata.id,
+ partition_id,
+ file_id,
+ is_sort_shuffle,
+ prefer_flight,
+ )
.await
}
fn fetch_partition_local(
+ work_dir: String,
Review Comment:
```suggestion
work_dir: &str,
```
##########
ballista/core/src/execution_plans/sort_shuffle/writer.rs:
##########
@@ -312,10 +313,12 @@ impl SortShuffleWriterExec {
if num_rows > 0 {
results.push(ShuffleWritePartition {
partition_id: part_id as u64,
- path: data_path.to_string_lossy().to_string(),
+ //path: data_path.to_string_lossy().to_string(),
Review Comment:
```suggestion
```
##########
ballista/executor/src/executor_process.rs:
##########
@@ -442,11 +443,12 @@ pub async fn start_executor_process(
};
let shutdown = shutdown_notification.subscribe_for_shutdown();
let override_flight = opt.override_arrow_flight_service.clone();
-
+ //let wd = work_dir.clone();
Review Comment:
```suggestion
```
##########
ballista/executor/src/lib.rs:
##########
@@ -65,12 +65,14 @@ use ballista_core::utils::GrpcServerConfig;
/// [ArrowFlightServerProvider] provides a function which creates a new Arrow
Flight server.
///
/// The function should take two arguments:
Review Comment:
```suggestion
/// The function should take four arguments:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]