pvary commented on PR #9308: URL: https://github.com/apache/iceberg/pull/9308#issuecomment-1860397381
> These messages are executed in the same thread as the fetch method, so in this case we have to return from the fetch loop, even with empty results. This is somewhat concerning to me considering the comment here: I have checked, and the idleness is defined like this: https://github.com/apache/flink/blob/1d33773e6b7f9f76f03ff8ffd73171b95fa24ccb/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L338-L345 ``` boolean isIdle() { lock.lock(); try { return assignedSplits.isEmpty() && taskQueue.isEmpty() && runningTask == null; } finally { lock.unlock(); } } ``` So if we have tasks (split manipulation/pause), then we are not idle, so the Fetcher is not removed. Also the Fetcher also handles the `wakeUp` flag, so we are golden: https://github.com/apache/flink/blob/c28dd942e3dded4cab7a33037077a1d3bfa0929e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L61-L71 ``` if (!isWakenUp()) { // The order matters here. We must first put the last records into the queue. // This ensures the handling of the fetched records is atomic to wakeup. if (elementsQueue.put(fetcherIndex, lastRecords)) { if (!lastRecords.finishedSplits().isEmpty()) { // The callback does not throw InterruptedException. splitFinishedCallback.accept(lastRecords.finishedSplits()); } lastRecords = null; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org