xuzhiwen1255 commented on PR #6614: URL: https://github.com/apache/iceberg/pull/6614#issuecomment-1407547988
I'm sorry, I've been spending time with my family recently, so I haven't discussed this issue together. I would like to share my opinion. > I think that if a catalog is closed, it's reasonable for tables to stop operating as well. The catalog manages its shared resources and if it chooses to share a connection pool with tables then it makes sense for the tables to no longer be able to connect after the pool is closed. > > Tables should not own their own connection pools, so some resource needs to manage them and the catalog is a good place to do that. > > I think the problem is that the `TableLoader` is taking ownership of the catalog and closing it. That seems incorrect to me. +1,I think that after the catalog is closed, the table should be closed as well. I think this is a code bug. Suppose we use another catalog that is not currently jdbcCatalog, and it passes in some closeable objects, but the same problem still exists when catalog#close is closed. Therefore, we need to avoid still using the table loaded by the current catalog after the catalog is closed. This will cause some unexpected situations to occur. In fact, some problems have been exposed now. > I think of a way to circumvent this problem, I wonder if it is feasible pseudocode ```java // Abstract out a table accessor, through the public accessor to access the table,he manages the life cycle of the tables and catalog. public class TableAccessor implements Closeable { private final TableLoader tableLoader; private Table table; public TableAccessor(TableLoader tableLoader) { this.tableLoader = tableLoader; } private Table lazyTable() { if (table == null) { tableLoader.open(); try (TableLoader loader = tableLoader) { this.table = loader.loadTable(); } catch (IOException e) { throw new UncheckedIOException("Failed to close table loader", e); } } return table; } @Override public void close() throws IOException { tableLoader.close(); } } // ----- icebergSource --------- private TableAccessor tableAccessor; IcebergSource( TableLoader tableLoader) { this.tableLoader = tableLoader; // An accessor is created when an icebergSource is built, and subsequent operations or reference retrieval on the table is obtained by the accessor tableAccessor = new TableAccessor(tableLoader); } private List<IcebergSourceSplit> planSplitsForBatch(String threadName) { ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism()); try { List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool); LOG.info( "Discovered {} splits from table {} during job initialization", splits.size(), lazyTable().name()); return splits; } finally { workerPool.shutdown(); // close accessor tableAccessor.close(); } } ``` For streaming mode, we pass the accessor directly to the ContinuousSplitPlannerImpl, which closes the accessor. ```java private TableAccessor tableAccessor; public ContinuousSplitPlannerImpl(TableAccessor tableAccessor .......) { this.tableAccessor =tableAccessor; ..... } @Override public void close() throws IOException { if (!isSharedPool) { workerPool.shutdown(); } tableAccessor.close(); } ``` In this way, we need to access the table through the accessor of a table. tableLoader is maintained by the accessor. When the accessor is closed, we close the tableLoader to ensure that it is closed correctly and that the reference type in the table will not be unavailable because catalog is closed first. @stevenzwu @pvary @hililiwei What do you think of this plan. Please correct me if I'm wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org