zeroshade commented on code in PR #1368:
URL: https://github.com/apache/iceberg-go/pull/1368#discussion_r3522375660


##########
table/rolling_data_writer.go:
##########
@@ -440,13 +441,22 @@ func (r *RollingDataWriter) sendError(err error) {
        }
 }
 
-func (r *RollingDataWriter) close() {
+// closeInput gracefully closes the record channel so any queued writes can 
still be
+// processed while honoring context cancellation checks in the stream.
+func (r *RollingDataWriter) closeInput() {
+       r.closeRecordCh.Do(func() {
+               close(r.recordCh)
+       })
+}
+
+// abort cancels in-flight work and then closes input so the stream can exit.
+func (r *RollingDataWriter) abort() {
        r.cancel()
-       close(r.recordCh)
+       r.closeInput()
 }
 
 func (r *RollingDataWriter) closeAndWait() error {
-       r.close()
+       r.closeInput()
        r.factory.writers.Delete(r.partitionKey)

Review Comment:
   [MINOR] Normal `closeAndWait` no longer calls the writer `CancelFunc` at 
all, so the `context.WithCancel` child lives until its parent is canceled. Call 
`cancel` after the stream finishes draining, e.g. after `wg.Wait()`.



##########
table/rolling_data_writer_test.go:
##########
@@ -187,6 +187,67 @@ func (s *RollingDataWriterTestSuite) 
TestRollsMultipleFiles() {
        s.Equal(totalRows, actualRows)
 }
 
+func (s *RollingDataWriterTestSuite) 
TestCloseAllFinishesQueuedRecordsWithoutCancellingContext() {
+       arrSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+               {Name: "name", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       loc := filepath.ToSlash(s.T().TempDir())
+       factory, _ := s.createWriterFactory(loc, arrSchema, 1024*1024)
+
+       outputCh := make(chan iceberg.DataFile, 10)
+       writerCtx, cancel := context.WithCancel(s.ctx)
+       defer cancel()

Review Comment:
   [MINOR] This regression test derives the writer context from a live context 
and calls `closeAll()` before canceling the parent, so it does not exercise the 
fanout-after-`errgroup`-cancel scenario. Add a test where records are queued, 
the fanout workers return successfully and cancel the `errgroup` context via 
`Wait()`, and `closeAll()` must still flush all rows without `context.Canceled`.



##########
table/rolling_data_writer.go:
##########
@@ -440,13 +441,22 @@ func (r *RollingDataWriter) sendError(err error) {
        }
 }
 
-func (r *RollingDataWriter) close() {
+// closeInput gracefully closes the record channel so any queued writes can 
still be
+// processed while honoring context cancellation checks in the stream.
+func (r *RollingDataWriter) closeInput() {
+       r.closeRecordCh.Do(func() {
+               close(r.recordCh)
+       })
+}
+
+// abort cancels in-flight work and then closes input so the stream can exit.
+func (r *RollingDataWriter) abort() {
        r.cancel()
-       close(r.recordCh)
+       r.closeInput()
 }
 
 func (r *RollingDataWriter) closeAndWait() error {

Review Comment:
   [BLOCKER] This graceful path avoids calling `cancel`, but it still does not 
cover the fanout case: in `partitioned_fanout_writer.go` the rolling writers 
are created from the `errgroup.WithContext` context around line 160, and 
`fanoutWorkers.Wait()` cancels that context before `closeAll()` runs around 
line 188. Records drained here can still run through 
`stream()`/`openFileWriter`/`ToRequestedSchema`/`SortRecordBatch` with a 
canceled `r.ctx` and fail with `context.Canceled`. Fix by decoupling the fanout 
coordination context from the per-writer drain context: after a successful 
`Wait()`, drain with a context not canceled by `errgroup.Wait()`; on `Wait()` 
error or consumer abort, use the abort path that cancels writers before closing 
input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to