This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5282-4cc7539ee163fdfafa429ad390fc806137758381 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 59f776c853d3b18a6e385ed9b27b7d4aafc835de Author: Meng Wang <[email protected]> AuthorDate: Thu May 28 17:10:36 2026 -0700 fix(workflow-operator): auto-rename empty CSV column headers (#5282) ### What changes were proposed in this PR? A CSV with an empty column header (e.g. a trailing comma `id,name,,age`) produced an `Attribute` with an empty name. After #3295 every output port writes via `IcebergTableWriter → Parquet`, where Avro rejects empty names with `IllegalArgumentException: Empty name` at flush time — losing the operator port's entire result. Rename blank header positions to `column-<index>` (the convention used by pandas, Spark, R, and DuckDB) in all three CSV scan operators: `CSVScanSourceOpDesc`, `ParallelCSVScanSourceOpDesc`, and `CSVOldScanSourceOpDesc`. The issue named the first two; the legacy `csvOld` variant is still registered in `LogicalOp` and had the same latent bug. Note: `ParallelCSVScanSourceOpDesc` is currently commented out of `LogicalOp`'s operator registry (so it is not reachable from the UI), but it is fixed here for consistency and so the bug does not resurface if it is re-enabled for experiments. ### Any related issues, documentation, discussions? Closes #5279. ### How was this PR tested? Added three cases to `CSVScanSourceOpDescSpec` — one per operator — feeding a CSV with an empty third header (`id,name,,age`) and asserting the inferred schema is `["id", "name", "column-3", "age"]`. `WorkflowOperator` suite is green (13 passed); scalafmt clean. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) --- .../source/scan/csv/CSVScanSourceOpDesc.scala | 6 ++- .../scan/csv/ParallelCSVScanSourceOpDesc.scala | 7 ++- .../scan/csvOld/CSVOldScanSourceOpDesc.scala | 7 ++- .../source/scan/csv/CSVScanSourceOpDescSpec.scala | 50 ++++++++++++++++++++++ 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala index 57b173583e..d32bbb31c6 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala @@ -120,7 +120,11 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { else (1 to attributeTypeList.length).map(i => "column-" + i).toArray header.indices.foldLeft(Schema()) { (schema, i) => - schema.add(header(i), attributeTypeList(i)) + // Auto-rename blank header positions to `column-N` so empty CSV headers + // (e.g. a trailing comma) do not propagate empty attribute names to + // downstream Iceberg/Parquet writers, which reject them. + val name = Option(header(i)).filter(_.nonEmpty).getOrElse(s"column-${i + 1}") + schema.add(name, attributeTypeList(i)) } } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala index 6ac2d20af4..9609b4e239 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala @@ -104,10 +104,13 @@ class ParallelCSVScanSourceOpDesc extends ScanSourceOpDesc { reader.close() - // build schema based on inferred AttributeTypes + // build schema based on inferred AttributeTypes. + // Auto-rename blank header positions to `column-N` so empty CSV headers + // (e.g. a trailing comma) do not propagate empty attribute names to + // downstream Iceberg/Parquet writers, which reject them. Schema().add(firstRow.indices.map { i => new Attribute( - if (hasHeader) firstRow(i) else s"column-${i + 1}", + if (hasHeader && firstRow(i).nonEmpty) firstRow(i) else s"column-${i + 1}", attributeTypeList(i) ) }) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala index 08b5ea1ce1..cdf9e655cc 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala @@ -102,10 +102,13 @@ class CSVOldScanSourceOpDesc extends ScanSourceOpDesc { reader.close() - // build schema based on inferred AttributeTypes + // build schema based on inferred AttributeTypes. + // Auto-rename blank header positions to `column-N` so empty CSV headers + // (e.g. a trailing comma) do not propagate empty attribute names to + // downstream Iceberg/Parquet writers, which reject them. Schema().add(firstRow.indices.map { i => new Attribute( - if (hasHeader) firstRow(i) else s"column-${i + 1}", + if (hasHeader && firstRow(i).nonEmpty) firstRow(i) else s"column-${i + 1}", attributeTypeList(i) ) }) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala index eb174e0691..87b86fea6d 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala @@ -26,9 +26,13 @@ import org.apache.texera.amber.core.workflow.WorkflowContext.{ DEFAULT_WORKFLOW_ID } import org.apache.texera.amber.operator.TestOperators +import org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc import org.scalatest.BeforeAndAfter import org.scalatest.flatspec.AnyFlatSpec +import java.nio.charset.StandardCharsets +import java.nio.file.Files + class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { var csvScanSourceOpDesc: CSVScanSourceOpDesc = _ @@ -38,6 +42,18 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { parallelCsvScanSourceOpDesc = new ParallelCSVScanSourceOpDesc() } + // Writes a CSV whose header row has an empty column (the third position), + // e.g. `id,name,,age`, and returns the absolute path. + private def writeCsvWithEmptyHeader(): String = { + val tmpFile = Files.createTempFile("empty-header-", ".csv") + tmpFile.toFile.deleteOnExit() + Files.write( + tmpFile, + "id,name,,age\n1,Alice,x,30\n2,Bob,y,25\n".getBytes(StandardCharsets.UTF_8) + ) + tmpFile.toString + } + it should "infer schema from single-line-data csv" in { parallelCsvScanSourceOpDesc.fileName = Some(TestOperators.CountrySalesSmallCsvPath) @@ -160,4 +176,38 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { assert(csvScanSourceOpDesc.customDelimiter.contains(",")) } + it should "auto-rename empty CSV column headers to column-N" in { + val path = writeCsvWithEmptyHeader() + csvScanSourceOpDesc.fileName = Some(path) + csvScanSourceOpDesc.customDelimiter = Some(",") + csvScanSourceOpDesc.hasHeader = true + csvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(path)) + + val names = csvScanSourceOpDesc.sourceSchema().getAttributes.map(_.getName).toList + assert(names == List("id", "name", "column-3", "age")) + } + + it should "auto-rename empty CSV column headers to column-N for parallel CSV" in { + val path = writeCsvWithEmptyHeader() + parallelCsvScanSourceOpDesc.fileName = Some(path) + parallelCsvScanSourceOpDesc.customDelimiter = Some(",") + parallelCsvScanSourceOpDesc.hasHeader = true + parallelCsvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(path)) + + val names = parallelCsvScanSourceOpDesc.sourceSchema().getAttributes.map(_.getName).toList + assert(names == List("id", "name", "column-3", "age")) + } + + it should "auto-rename empty CSV column headers to column-N for old CSV" in { + val path = writeCsvWithEmptyHeader() + val oldCsvScanSourceOpDesc = new CSVOldScanSourceOpDesc() + oldCsvScanSourceOpDesc.fileName = Some(path) + oldCsvScanSourceOpDesc.customDelimiter = Some(",") + oldCsvScanSourceOpDesc.hasHeader = true + oldCsvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(path)) + + val names = oldCsvScanSourceOpDesc.sourceSchema().getAttributes.map(_.getName).toList + assert(names == List("id", "name", "column-3", "age")) + } + }
