mengw15 opened a new issue, #5279:
URL: https://github.com/apache/texera/issues/5279
### What happened?
A CSV with an empty column header (e.g. a trailing comma in the header row —
common in Excel exports, third-party CSVs, and auto-generated dumps) crashes
the operator output writer at flush time. The entire execution result for that
operator port is lost.
Stack trace:
```
java.lang.IllegalArgumentException: Empty name
at
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:143)
at
org.apache.iceberg.avro.AvroSchemaUtil.validAvroName(AvroSchemaUtil.java:492)
at
org.apache.iceberg.avro.AvroSchemaUtil.makeCompatibleName(AvroSchemaUtil.java:484)
at
org.apache.iceberg.parquet.TypeToMessageType.primitive(TypeToMessageType.java:122)
at
org.apache.iceberg.parquet.TypeToMessageType.field(TypeToMessageType.java:87)
at
org.apache.iceberg.parquet.TypeToMessageType.convert(TypeToMessageType.java:64)
at
org.apache.iceberg.parquet.ParquetSchemaUtil.convert(ParquetSchemaUtil.java:41)
at
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:311)
at
org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:760)
at
o.a.t.amber.core.storage.result.iceberg.IcebergTableWriter.flushBuffer(IcebergTableWriter.scala)
at
o.a.t.amber.core.storage.result.iceberg.IcebergTableWriter.close(IcebergTableWriter.scala)
at
o.a.t.amber.engine.architecture.worker.managers.OutputPortResultWriterThread.run(OutputPortResultWriterThread.scala)
```
### Root cause
[`CSVScanSourceOpDesc.scala:107-110`](https://github.com/apache/texera/blob/a820f6727/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala#L107-L110)
feeds the raw header string straight into `Attribute`:
```scala
new Attribute(
if (hasHeader) firstRow(i) else s"column-${i + 1}",
attributeTypeList(i)
)
```
If `firstRow(i)` is `""` (CSV with `id,name,,age` → position 2), the
resulting `Attribute` has name `""`. Same in
`ParallelCSVScanSourceOpDesc.scala`. Neither sanitizes.
The empty name flows through Schema → Iceberg Table →
`OutputPortResultWriterThread.run` → `IcebergTableWriter.close → flushBuffer →
Parquet.build → ParquetSchemaUtil.convert → AvroSchemaUtil.validAvroName`,
where Avro forbids empty names.
### Regression context
This is a regression introduced by
[#3295](https://github.com/apache/texera/pull/3295) ("Use Output Ports of an
Operator to Write Storage", merged 2025-03-09). Before #3295 sink operators
wrote results on a path that did not trigger the Parquet/Avro name check, so
empty-name attributes propagated harmlessly. After #3295 every output port
writes via `OutputPortResultWriterThread → IcebergTableWriter → Parquet` and
the validation fires. The CSV scan side has *always* allowed empty headers to
propagate — the bug pre-existed but was latent.
### Proposed fix
**Auto-rename empty header positions to `column-<index>`** at CSV scan,
following the precedent set by pandas (`Unnamed: N`), Spark (`_cN`), R (`VN`),
and DuckDB (`columnN`).
Concretely, in `CSVScanSourceOpDesc.scala:107-110`:
```scala
new Attribute(
if (hasHeader && firstRow(i).nonEmpty) firstRow(i) else s"column-${i + 1}",
attributeTypeList(i)
)
```
And the same in `ParallelCSVScanSourceOpDesc.scala`.
A defensive complementary check at `Schema.add` / `Attribute(...)` (reject
empty names with a clear message naming the offending operator) would make
future regressions impossible.
### How to reproduce?
Add this test at
`common/workflow-operator/src/test/scala/org/apache/amber/operator/source/scan/csv/CSVScanSourceEmptyHeaderSpec.scala`:
```scala
package org.apache.amber.operator.source.scan.csv
import org.apache.amber.core.storage.FileResolver
import org.apache.amber.core.tuple.Schema
import org.scalatest.flatspec.AnyFlatSpec
import java.nio.charset.StandardCharsets
import java.nio.file.Files
class CSVScanSourceEmptyHeaderSpec extends AnyFlatSpec {
it should "auto-rename empty CSV column headers to column-N" in {
val tmpFile = Files.createTempFile("empty-header-", ".csv")
tmpFile.toFile.deleteOnExit()
val content = "id,name,,age\n1,Alice,x,30\n2,Bob,y,25\n"
Files.write(tmpFile, content.getBytes(StandardCharsets.UTF_8))
val op = new CSVScanSourceOpDesc()
op.fileName = Some(tmpFile.toString)
op.customDelimiter = Some(",")
op.hasHeader = true
op.setResolvedFileName(FileResolver.resolve(op.fileName.get))
val schema: Schema = op.sourceSchema()
val attrNames = schema.getAttributes.map(_.getName).toList
val emptyPositions =
attrNames.zipWithIndex.collect { case (n, i) if n.isEmpty => i }
assert(emptyPositions.isEmpty,
s"Schema has empty attribute names at positions $emptyPositions; got
$attrNames")
}
}
```
Run:
```
sbt "WorkflowOperator/testOnly
org.apache.amber.operator.source.scan.csv.CSVScanSourceEmptyHeaderSpec"
```
Output on current `master`:
```
[info] CSVScanSourceEmptyHeaderSpec:
[info] - should auto-rename empty CSV column headers to column-N *** FAILED
***
[info] List(2) was not empty Schema produced by CSV scan contains empty
attribute name(s)
[info] at positions List(2). Full attribute names: List(id, name, , age).
[info] *** 1 TEST FAILED ***
```
The full-pipeline crash (the stack trace above) is reproducible by running
the same CSV through any workflow that materializes results via Iceberg.
### Version
1.1.0-incubating (Pre-release/Master)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]