Jefffrey commented on code in PR #17553:
URL: https://github.com/apache/datafusion/pull/17553#discussion_r2347631214


##########
datafusion/datasource-csv/src/tests.rs:
##########


Review Comment:
   I feel this file can be included at the bottom of 
`datafusion/datasource-csv/src/file_format.rs` instead of being a separate 
file, given it unit tests the `build_schema_helper()` function only



##########
datafusion/core/tests/csv_schema_fix_test.rs:
##########
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test for CSV schema inference with different column counts (GitHub issue 
#17516)
+
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use std::fs;
+use tempfile::TempDir;
+
+#[tokio::test]
+async fn test_csv_schema_inference_different_column_counts() -> Result<()> {
+    // Create temporary directory for test files
+    let temp_dir = TempDir::new().expect("Failed to create temp dir");
+    let temp_path = temp_dir.path();
+
+    // Create CSV file 1 with 3 columns (simulating older railway services 
format)
+    let csv1_content = r#"service_id,route_type,agency_id
+1,bus,agency1
+2,rail,agency2
+3,bus,agency3
+"#;
+    fs::write(temp_path.join("services_2024.csv"), csv1_content)?;
+
+    // Create CSV file 2 with 6 columns (simulating newer railway services 
format)
+    let csv2_content = 
r#"service_id,route_type,agency_id,stop_platform_change,stop_planned_platform,stop_actual_platform
+4,rail,agency2,true,Platform A,Platform B
+5,bus,agency1,false,Stop 1,Stop 1
+6,rail,agency3,true,Platform C,Platform D
+"#;
+    fs::write(temp_path.join("services_2025.csv"), csv2_content)?;
+
+    // Create DataFusion context
+    let ctx = SessionContext::new();
+
+    // This should now work (previously would have failed with column count 
mismatch)
+    // Enable truncated_rows to handle files with different column counts
+    let df = ctx
+        .read_csv(
+            temp_path.to_str().unwrap(), 
+            CsvReadOptions::new().truncated_rows(true)
+        )
+        .await
+        .expect("Should successfully read CSV directory with different column 
counts");
+
+    // Verify the schema contains all 6 columns (union of both files)
+    let df_clone = df.clone();
+    let schema = df_clone.schema();
+    assert_eq!(schema.fields().len(), 6, "Schema should contain all 6 
columns");
+
+    // Check that we have all expected columns
+    let field_names: Vec<&str> = schema.fields().iter().map(|f| 
f.name().as_str()).collect();
+    assert!(field_names.contains(&"service_id"));
+    assert!(field_names.contains(&"route_type"));
+    assert!(field_names.contains(&"agency_id"));
+    assert!(field_names.contains(&"stop_platform_change"));
+    assert!(field_names.contains(&"stop_planned_platform"));
+    assert!(field_names.contains(&"stop_actual_platform"));
+
+    // All fields should be nullable since they don't appear in all files
+    for field in schema.fields() {
+        assert!(
+            field.is_nullable(),
+            "Field {} should be nullable",
+            field.name()
+        );
+    }
+
+    // Verify we can actually read the data
+    let results = df.collect().await?;
+    
+    // Calculate total rows across all batches
+    let total_rows: usize = results.iter().map(|batch| batch.num_rows()).sum();
+    assert_eq!(total_rows, 6, "Should have 6 total rows across all batches");

Review Comment:
   I feel we should assert the actual row contents as well



##########
datafusion/core/tests/csv_schema_fix_test.rs:
##########
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test for CSV schema inference with different column counts (GitHub issue 
#17516)
+
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use std::fs;
+use tempfile::TempDir;
+
+#[tokio::test]
+async fn test_csv_schema_inference_different_column_counts() -> Result<()> {
+    // Create temporary directory for test files
+    let temp_dir = TempDir::new().expect("Failed to create temp dir");
+    let temp_path = temp_dir.path();
+
+    // Create CSV file 1 with 3 columns (simulating older railway services 
format)
+    let csv1_content = r#"service_id,route_type,agency_id
+1,bus,agency1
+2,rail,agency2
+3,bus,agency3
+"#;
+    fs::write(temp_path.join("services_2024.csv"), csv1_content)?;
+
+    // Create CSV file 2 with 6 columns (simulating newer railway services 
format)
+    let csv2_content = 
r#"service_id,route_type,agency_id,stop_platform_change,stop_planned_platform,stop_actual_platform
+4,rail,agency2,true,Platform A,Platform B
+5,bus,agency1,false,Stop 1,Stop 1
+6,rail,agency3,true,Platform C,Platform D
+"#;
+    fs::write(temp_path.join("services_2025.csv"), csv2_content)?;
+
+    // Create DataFusion context
+    let ctx = SessionContext::new();
+
+    // This should now work (previously would have failed with column count 
mismatch)
+    // Enable truncated_rows to handle files with different column counts
+    let df = ctx
+        .read_csv(
+            temp_path.to_str().unwrap(), 
+            CsvReadOptions::new().truncated_rows(true)
+        )
+        .await
+        .expect("Should successfully read CSV directory with different column 
counts");
+
+    // Verify the schema contains all 6 columns (union of both files)
+    let df_clone = df.clone();
+    let schema = df_clone.schema();
+    assert_eq!(schema.fields().len(), 6, "Schema should contain all 6 
columns");
+
+    // Check that we have all expected columns
+    let field_names: Vec<&str> = schema.fields().iter().map(|f| 
f.name().as_str()).collect();
+    assert!(field_names.contains(&"service_id"));
+    assert!(field_names.contains(&"route_type"));
+    assert!(field_names.contains(&"agency_id"));
+    assert!(field_names.contains(&"stop_platform_change"));
+    assert!(field_names.contains(&"stop_planned_platform"));
+    assert!(field_names.contains(&"stop_actual_platform"));
+
+    // All fields should be nullable since they don't appear in all files
+    for field in schema.fields() {
+        assert!(
+            field.is_nullable(),
+            "Field {} should be nullable",
+            field.name()
+        );
+    }
+
+    // Verify we can actually read the data
+    let results = df.collect().await?;
+    
+    // Calculate total rows across all batches
+    let total_rows: usize = results.iter().map(|batch| batch.num_rows()).sum();
+    assert_eq!(total_rows, 6, "Should have 6 total rows across all batches");
+
+    // All batches should have 6 columns (the union schema)
+    for batch in &results {
+        assert_eq!(batch.num_columns(), 6, "All batches should have 6 
columns");
+    }
+
+    // Verify that the union schema is being used correctly
+    // We should be able to find records from both files
+    println!("✅ Successfully read {} record batches with {} total rows", 
results.len(), total_rows);
+    
+    // Verify schema has all expected columns
+    for batch in &results {
+        assert_eq!(batch.schema().fields().len(), 6, "Each batch should use 
the union schema with 6 fields");
+    }
+
+    println!("✅ Successfully verified CSV schema inference fix!");
+    println!("   - Read {} files with different column counts (3 vs 6)", 
temp_dir.path().read_dir().unwrap().count());
+    println!("   - Inferred schema with {} columns", schema.fields().len());
+    println!("   - Processed {} total rows", total_rows);

Review Comment:
   Could we remove these prints, and keep the tests to assert statements only? 
To cut down on verbosity



##########
datafusion/datasource-csv/src/file_format.rs:
##########
@@ -497,7 +497,20 @@ impl FileFormat for CsvFormat {
 impl CsvFormat {
     /// Return the inferred schema reading up to records_to_read from a
     /// stream of delimited chunks returning the inferred schema and the
-    /// number of lines that were read
+    /// number of lines that were read.
+    ///
+    /// This method now supports CSV files with different numbers of columns.

Review Comment:
   ```suggestion
       /// This method can handle CSV files with different numbers of columns.
   ```



##########
datafusion/core/tests/csv_schema_fix_test.rs:
##########
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test for CSV schema inference with different column counts (GitHub issue 
#17516)
+
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use std::fs;
+use tempfile::TempDir;
+
+#[tokio::test]
+async fn test_csv_schema_inference_different_column_counts() -> Result<()> {
+    // Create temporary directory for test files
+    let temp_dir = TempDir::new().expect("Failed to create temp dir");
+    let temp_path = temp_dir.path();
+
+    // Create CSV file 1 with 3 columns (simulating older railway services 
format)
+    let csv1_content = r#"service_id,route_type,agency_id
+1,bus,agency1
+2,rail,agency2
+3,bus,agency3
+"#;
+    fs::write(temp_path.join("services_2024.csv"), csv1_content)?;
+
+    // Create CSV file 2 with 6 columns (simulating newer railway services 
format)
+    let csv2_content = 
r#"service_id,route_type,agency_id,stop_platform_change,stop_planned_platform,stop_actual_platform
+4,rail,agency2,true,Platform A,Platform B
+5,bus,agency1,false,Stop 1,Stop 1
+6,rail,agency3,true,Platform C,Platform D
+"#;
+    fs::write(temp_path.join("services_2025.csv"), csv2_content)?;
+
+    // Create DataFusion context
+    let ctx = SessionContext::new();
+
+    // This should now work (previously would have failed with column count 
mismatch)
+    // Enable truncated_rows to handle files with different column counts
+    let df = ctx
+        .read_csv(
+            temp_path.to_str().unwrap(), 
+            CsvReadOptions::new().truncated_rows(true)

Review Comment:
   If we do decide to reuse this existing config option then we should update 
the documentation for it as we're now repurposing it for something different 
(albeit similar) in purpose



##########
datafusion/datasource-csv/src/file_format.rs:
##########
@@ -560,21 +573,28 @@ impl CsvFormat {
                     })
                     .unzip();
             } else {
-                if fields.len() != column_type_possibilities.len() {
-                    return exec_err!(
-                            "Encountered unequal lengths between records on 
CSV file whilst inferring schema. \
-                             Expected {} fields, found {} fields at record {}",
-                            column_type_possibilities.len(),
-                            fields.len(),
-                            record_number + 1
-                        );
+                // Handle files with different numbers of columns by extending 
the schema
+                if fields.len() > column_type_possibilities.len() {
+                    // New columns found - extend our tracking structures
+                    for field in 
fields.iter().skip(column_type_possibilities.len()) {
+                        column_names.push(field.name().clone());
+                        let mut possibilities = HashSet::new();
+                        if records_read > 0 {
+                            possibilities.insert(field.data_type().clone());
+                        }
+                        column_type_possibilities.push(possibilities);
+                    }
+                }
+                
+                // Update type possibilities for columns that exist in this 
file
+                // We take the minimum of fields.len() and 
column_type_possibilities.len() 
+                // to avoid index out of bounds when a file has fewer columns
+                let max_fields_to_process = 
fields.len().min(column_type_possibilities.len());

Review Comment:
   Is this minimum strictly necessary? Above we check that if `fields > 
column_type_possibilities` then `column_type_possibilities` is increased to 
match `fields`; if `fields < column_type_possibilities` then we only need to 
iterate over fields anyway



##########
datafusion/datasource-csv/src/file_format.rs:
##########
@@ -560,21 +573,28 @@ impl CsvFormat {
                     })
                     .unzip();
             } else {
-                if fields.len() != column_type_possibilities.len() {
-                    return exec_err!(
-                            "Encountered unequal lengths between records on 
CSV file whilst inferring schema. \
-                             Expected {} fields, found {} fields at record {}",
-                            column_type_possibilities.len(),
-                            fields.len(),
-                            record_number + 1
-                        );
+                // Handle files with different numbers of columns by extending 
the schema
+                if fields.len() > column_type_possibilities.len() {
+                    // New columns found - extend our tracking structures
+                    for field in 
fields.iter().skip(column_type_possibilities.len()) {
+                        column_names.push(field.name().clone());
+                        let mut possibilities = HashSet::new();
+                        if records_read > 0 {
+                            possibilities.insert(field.data_type().clone());
+                        }
+                        column_type_possibilities.push(possibilities);
+                    }
+                }

Review Comment:
   What happens if file A has columns t1, t3 but file B has columns t1, t2, t3?
   
   Do we only allow files having subset of columns of other files in the exact 
correct order?
   
   AKA we don't support union by name?



##########
datafusion/datasource-csv/src/tests.rs:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+mod tests {
+    use crate::file_format::build_schema_helper;
+    use arrow::datatypes::DataType;
+    use std::collections::HashSet;
+
+    #[test]
+    fn test_build_schema_helper_different_column_counts() {
+        // Test the core schema building logic with different column counts
+        let mut column_names = vec!["col1".to_string(), "col2".to_string(), 
"col3".to_string()];
+        
+        // Simulate adding two more columns from another file
+        column_names.push("col4".to_string());
+        column_names.push("col5".to_string());
+        
+        let column_type_possibilities = vec![
+            {
+                let mut set = HashSet::new();
+                set.insert(DataType::Int64);
+                set
+            },

Review Comment:
   ```suggestion
               HashSet::from([DataType::Int64])
   ```
   
   FYI, applies for the rest too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to