hcrosse commented on code in PR #1537:
URL: 
https://github.com/apache/datafusion-ballista/pull/1537#discussion_r3028649892


##########
benchmarks/src/bin/shuffle_bench.rs:
##########
@@ -240,19 +275,53 @@ async fn benchmark_sort_shuffle(
             output_partitions,
         ),
         config,
-    )?;
+    )?);
 
     let start = Instant::now();
 
-    // Execute all input partitions (not output partitions)
     let input_partition_count = data.len();
     let mut total_files = 0;
-    for partition in 0..input_partition_count {
-        let mut stream = shuffle_writer.execute(partition, task_ctx.clone())?;
-        let batches = utils::collect_stream(&mut stream).await?;
-        // Count output files from the result
-        if let Some(batch) = batches.first() {
-            total_files += batch.num_rows();
+
+    if concurrency <= 1 {
+        for partition in 0..input_partition_count {
+            let mut stream = shuffle_writer.execute(partition, 
task_ctx.clone())?;
+            let batches = utils::collect_stream(&mut stream).await?;
+            if let Some(batch) = batches.first() {
+                total_files += batch.num_rows();
+            }
+        }
+    } else {
+        let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));

Review Comment:
   My bad 😅! Extracted to the helper function `run_concurrent`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to