mengw15 opened a new issue, #5279:
URL: https://github.com/apache/texera/issues/5279

   ### What happened?
   
   A CSV with an empty column header (e.g. a trailing comma in the header row — 
common in Excel exports, third-party CSVs, and auto-generated dumps) crashes 
the operator output writer at flush time. The entire execution result for that 
operator port is lost.
   
   Stack trace:
   
   ```
   java.lang.IllegalArgumentException: Empty name
       at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:143)
       at 
org.apache.iceberg.avro.AvroSchemaUtil.validAvroName(AvroSchemaUtil.java:492)
       at 
org.apache.iceberg.avro.AvroSchemaUtil.makeCompatibleName(AvroSchemaUtil.java:484)
       at 
org.apache.iceberg.parquet.TypeToMessageType.primitive(TypeToMessageType.java:122)
       at 
org.apache.iceberg.parquet.TypeToMessageType.field(TypeToMessageType.java:87)
       at 
org.apache.iceberg.parquet.TypeToMessageType.convert(TypeToMessageType.java:64)
       at 
org.apache.iceberg.parquet.ParquetSchemaUtil.convert(ParquetSchemaUtil.java:41)
       at 
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:311)
       at 
org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:760)
       at 
o.a.t.amber.core.storage.result.iceberg.IcebergTableWriter.flushBuffer(IcebergTableWriter.scala)
       at 
o.a.t.amber.core.storage.result.iceberg.IcebergTableWriter.close(IcebergTableWriter.scala)
       at 
o.a.t.amber.engine.architecture.worker.managers.OutputPortResultWriterThread.run(OutputPortResultWriterThread.scala)
   ```
   
   ### Root cause
   
   
[`CSVScanSourceOpDesc.scala:107-110`](https://github.com/apache/texera/blob/a820f6727/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala#L107-L110)
 feeds the raw header string straight into `Attribute`:
   
   ```scala
   new Attribute(
     if (hasHeader) firstRow(i) else s"column-${i + 1}",
     attributeTypeList(i)
   )
   ```
   
   If `firstRow(i)` is `""` (CSV with `id,name,,age` → position 2), the 
resulting `Attribute` has name `""`. Same in 
`ParallelCSVScanSourceOpDesc.scala`. Neither sanitizes.
   
   The empty name flows through Schema → Iceberg Table → 
`OutputPortResultWriterThread.run` → `IcebergTableWriter.close → flushBuffer → 
Parquet.build → ParquetSchemaUtil.convert → AvroSchemaUtil.validAvroName`, 
where Avro forbids empty names.
   
   ### Regression context
   
   This is a regression introduced by 
[#3295](https://github.com/apache/texera/pull/3295) ("Use Output Ports of an 
Operator to Write Storage", merged 2025-03-09). Before #3295 sink operators 
wrote results on a path that did not trigger the Parquet/Avro name check, so 
empty-name attributes propagated harmlessly. After #3295 every output port 
writes via `OutputPortResultWriterThread → IcebergTableWriter → Parquet` and 
the validation fires. The CSV scan side has *always* allowed empty headers to 
propagate — the bug pre-existed but was latent.
   
   ### Proposed fix
   
   **Auto-rename empty header positions to `column-<index>`** at CSV scan, 
following the precedent set by pandas (`Unnamed: N`), Spark (`_cN`), R (`VN`), 
and DuckDB (`columnN`).
   
   Concretely, in `CSVScanSourceOpDesc.scala:107-110`:
   
   ```scala
   new Attribute(
     if (hasHeader && firstRow(i).nonEmpty) firstRow(i) else s"column-${i + 1}",
     attributeTypeList(i)
   )
   ```
   
   And the same in `ParallelCSVScanSourceOpDesc.scala`.
   
   A defensive complementary check at `Schema.add` / `Attribute(...)` (reject 
empty names with a clear message naming the offending operator) would make 
future regressions impossible.
   
   ### How to reproduce?
   
   Add this test at 
`common/workflow-operator/src/test/scala/org/apache/amber/operator/source/scan/csv/CSVScanSourceEmptyHeaderSpec.scala`:
   
   ```scala
   package org.apache.amber.operator.source.scan.csv
   
   import org.apache.amber.core.storage.FileResolver
   import org.apache.amber.core.tuple.Schema
   import org.scalatest.flatspec.AnyFlatSpec
   import java.nio.charset.StandardCharsets
   import java.nio.file.Files
   
   class CSVScanSourceEmptyHeaderSpec extends AnyFlatSpec {
     it should "auto-rename empty CSV column headers to column-N" in {
       val tmpFile = Files.createTempFile("empty-header-", ".csv")
       tmpFile.toFile.deleteOnExit()
       val content = "id,name,,age\n1,Alice,x,30\n2,Bob,y,25\n"
       Files.write(tmpFile, content.getBytes(StandardCharsets.UTF_8))
   
       val op = new CSVScanSourceOpDesc()
       op.fileName = Some(tmpFile.toString)
       op.customDelimiter = Some(",")
       op.hasHeader = true
       op.setResolvedFileName(FileResolver.resolve(op.fileName.get))
   
       val schema: Schema = op.sourceSchema()
       val attrNames = schema.getAttributes.map(_.getName).toList
       val emptyPositions =
         attrNames.zipWithIndex.collect { case (n, i) if n.isEmpty => i }
   
       assert(emptyPositions.isEmpty,
         s"Schema has empty attribute names at positions $emptyPositions; got 
$attrNames")
     }
   }
   ```
   
   Run:
   ```
   sbt "WorkflowOperator/testOnly 
org.apache.amber.operator.source.scan.csv.CSVScanSourceEmptyHeaderSpec"
   ```
   
   Output on current `master`:
   ```
   [info] CSVScanSourceEmptyHeaderSpec:
   [info] - should auto-rename empty CSV column headers to column-N *** FAILED 
***
   [info]   List(2) was not empty Schema produced by CSV scan contains empty 
attribute name(s)
   [info]   at positions List(2). Full attribute names: List(id, name, , age).
   [info] *** 1 TEST FAILED ***
   ```
   
   The full-pipeline crash (the stack trace above) is reproducible by running 
the same CSV through any workflow that materializes results via Iceberg.
   
   ### Version
   
   1.1.0-incubating (Pre-release/Master)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to